Synchronous(1 Producer 1 Consumer)
Sender waits on channel until reciever recieves the message.
When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.
use std::sync::mpsc; //mpsc: Multiple producer, Single Consumer.
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel(); //1. Created channel having 2 ends(transmitter, reciever)
let tid1 = thread::spawn( //2. Thread-1(Transmitter) owns tx(using move) and sends val=test on it
move || {
let val = String::from("test");
tx.send(val).unwrap();
//println!("{}", val); //We cannot use value after sending over channel, bcoz reciever might change it.
} //ie ownership is transferred.
);
println!("{}", rx.recv().unwrap()); //3. Thread-2(main) recieves message over rx end of channel.
}
$ cargo run
test
Synchronous(Multiple Producer 1 consumer)
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = tx.clone();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("thread1"),
];
for val in vals {
tx1.send(val).unwrap(); //Sending over tx1
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("more"),
String::from("messages"),
];
for val in vals {
tx.send(val).unwrap(); //Sending over tx
thread::sleep(Duration::from_secs(1));
}
});
for i in rx {
println!("{}", i);
}
}
$ cargo run
hi more thread1 from messages //See threads are executed in different order hence values recieved in different order
Asynchronous(1 Producer 1 Consumer)
Neither reciever, nor sender waits on channel(ie sender/reciever can send/recv and leave no need to wait).
When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.
Kinds of channels: 1. Bounded channel with limited capacity. 2. Unbounded channel with unlimited capacity.
# Cargo.toml
[package]
name = "async_channel"
version = "0.1.0"
edition = "2018"
[dependencies]
async-channel = {version = "1.5"}
tokio = {version = "0.2.*", features = ["full"] }
# main.rs
use async_channel;
async fn fun() {
let (s, r) = async_channel::unbounded();
assert_eq!(s.send("Hello").await, Ok(())); //Sends a message into the channel.
assert_eq!(r.recv().await, Ok("Hello")); //Receives a message from channel. If the channel is empty, this method waits until there is a message.
}
fn main() {
let mut rt = tokio::runtime::Runtime::new().unwrap(); //1. Start tokio runtime
let local = tokio::task::LocalSet::new();
local.block_on(&mut rt, async move { fun().await });
}