IPC
Rust (Channels)
What is Channel?
Types Channels?
mpsc / Multi-Producer, Single-Consumer | tokio async_channel / Multi-Producer, Multiple-Consumer | |
---|---|---|
Producer, Consumer | Multi Producer, 1 Consumer | Multi Producer, Multi Consumer |
Provided by | Standard library | Tokio runtime |
Unbounded |
Unbounded means that the channel has no limit on the number of messages it can hold at a time. Sender can continue sending messages indefinitely, they will accumulate in memory until the receiver consumes them. Yes |
Yes |
Asynchronous |
Means that the sending and receiving operations are non-blocking Sender and receiver can proceed independently NO. This is synchronous, ie blocking |
Operations are non-blocking ie No one waits on channel(neither sender, nor rcvr) When all Senders or all Receivers are dropped, the channel becomes closed Yes. This is Asynchronous ie non-blocking |
Multi-Producer, Single-Consumer (mpsc)
1 Sender 1 Reciever (Unbounded, synchronous) | 4 Sender 1 Reciever (Unbounded, synchronous) | 1 Sender 1 Reciever (Bounded, synchronous) |
---|---|---|
|
|
|
Multi-Producer, Multi-Consumer (Unbounded, Asynchronous)
# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
async-channel = {version = "1.5"}
# main.rs
use async_channel;
use tokio::task;
use tokio;
use tokio::sync::Mutex;
use std::sync::Arc;
async fn send_thread(t: async_channel::Sender>>, var: Arc>) {
{
let mut a= var.lock().await; //returns MutexGuard<_, i32>
*a += 1;
}
match t.send(var).await{ //returns Result<(), SendError>
Ok(_o) => {
println!("Sent Arc on channel");
},
Err(e) => {
println!("Error: {}", e);
}
};
}
async fn recv_thread(r: async_channel::Receiver>>) {
match r.recv().await{ //returns MutexGuard<_, i32>
Ok(o) => {
println!("Recieved Mutex");
let a = o.lock().await;
println!("{} ", *a);
},
Err(e) => {
println!("Error: {}", e);
}
};
}
#[tokio::main]
async fn main() {
//This would be passed between threads
let var = Arc::new(Mutex::new(0));
let mut handles = vec![];
let (t, r) = async_channel::unbounded();
// Create 3 tokio tasks, sending on channel
for _ in 0..3 {
let tx = t.clone();
let var_clone = var.clone();
let handle = task::spawn(send_thread(tx, var_clone));
handles.push(handle);
}
for _ in 0..3 {
let rx = r.clone();
let handle = task::spawn(recv_thread(rx));
handles.push(handle);
}
for h in handles {
h.await.unwrap();
}
}