Rust Channel(for IPC)
-
Sender <T> and Receiver <T> are connected via Channel. They can
send/recv data via channel.
Types of Channels
1. Multi-Producer, Single-Consumer (mpsc)
-
Provided by: Standard library
Unbounded? Yes. Means that the channel has no limit on the number of messages it can hold at a time. Sender can continue sending messages indefinitely, they will accumulate in memory until the receiver consumes them.
Asynchronous: No. Means that the sending and receiving operations are non-blocking Sender and receiver can proceed independently. This is synchronous(ie blocking)
| 1 Sender 1 Reciever (Unbounded, synchronous) | 4 Sender 1 Reciever (Unbounded, synchronous) | 1 Sender 1 Reciever (Bounded, synchronous) |
|---|---|---|
|
|
|
2. Multi-Producer, Multiple-Consumer
-
Provided by: tokio runtime
Unbounded: yes
Asynchronous: yes
multi consumer, producer(Unbounded, Asynchronous)
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
async-channel = {version = "1.5"}
# main.rs
use async_channel;
use tokio::task;
use tokio;
use tokio::sync::Mutex;
use std::sync::Arc;
async fn send_thread(t: async_channel::Sender <Arc<Mutex<i32>>>, var: Arc<Mutex<i32>>) {
{
let mut a= var.lock().await; //returns MutexGuard<_, i32>
*a += 1;
}
match t.send(var).await{ //returns Result<(), SendError<i32>>
Ok(_o) => {
println!("Sent Arc<Mutex> on channel");
},
Err(e) => {
println!("Error: {}", e);
}
};
}
async fn recv_thread(r: async_channel::Receiver<Arc<Mutex<i32>>>) {
match r.recv().await{ //returns MutexGuard<_, i32>
Ok(o) => {
println!("Recieved Mutex");
let a = o.lock().await;
println!("{} ", *a);
},
Err(e) => {
println!("Error: {}", e);
}
};
}
#[tokio::main]
async fn main() {
//This would be passed between threads
let var = Arc::new(Mutex::new(0));
let mut handles = vec![];
let (t, r) = async_channel::unbounded();
// Create 3 tokio tasks, sending on channel
for _ in 0..3 {
let tx = t.clone();
let var_clone = var.clone();
let handle = task::spawn(send_thread(tx, var_clone));
handles.push(handle);
}
for _ in 0..3 {
let rx = r.clone();
let handle = task::spawn(recv_thread(rx));
handles.push(handle);
}
for h in handles {
h.await.unwrap();
}
}