Synchronous Asynchronous Channels


Synchronous vs Asynchronous Channel

Synchronous Channel (std::sync::mpsc::channel()) Asynchronous Channel (async_channel::unbounded())
Blocking Yes(sending and receiving messages on the channel will block the sender or receiver until the operation can be completed) No (used with async/await syntax)
Use Case For Tasks which want to synchronize their execution (Eg: in threads) working with async tasks and want to communicate between them without blocking

Code

Synchronous(1 Producer 1 Consumer)

  • Sender waits on channel until reciever recieves the message.
  • When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.
  • 
            use std::sync::mpsc;                            //mpsc: Multiple producer, Single Consumer.
            use std::thread;
            
            fn main() {
                let (tx, rx) = mpsc::channel();            //1. Created channel having 2 ends(transmitter, reciever)
                
                let tid1 = thread::spawn(                  //2. Thread-1(Transmitter) owns tx(using move) and sends val=test on it
                    move || {
                        let val = String::from("test");
                        tx.send(val).unwrap();
                        //println!("{}", val);              //We cannot use value after sending over channel, bcoz reciever might change it.
                    }                                       //ie ownership is transferred.
                );
                
                println!("{}", rx.recv().unwrap());         //3. Thread-2(main) recieves message over rx end of channel.
            }
            $ cargo run
            test
        

    Synchronous(Multiple Producer 1 consumer)

    
                use std::sync::mpsc;
                use std::thread;
                use std::time::Duration;
                
                fn main() {
                    let (tx, rx) = mpsc::channel();
                    let tx1 = tx.clone();
                    
                    thread::spawn(move || {
                        let vals = vec![
                            String::from("hi"),
                            String::from("from"),
                            String::from("thread1"),
                        ];
                        for val in vals {
                            tx1.send(val).unwrap();                 //Sending over tx1
                            thread::sleep(Duration::from_secs(1));
                        }
                    });
                
                    thread::spawn(move || {
                        let vals = vec![
                            String::from("more"),
                            String::from("messages"),               
                        ];
                
                        for val in vals {
                            tx.send(val).unwrap();                    //Sending over tx
                            thread::sleep(Duration::from_secs(1));
                        }
                    });
                
                    for i in rx {
                        println!("{}", i);
                    }
                }
                $ cargo run
                hi more thread1 from messages                       //See threads are executed in different order hence values recieved in different order
        

    Asynchronous(1 Producer 1 Consumer)

  • Neither reciever, nor sender waits on channel(ie sender/reciever can send/recv and leave no need to wait).
  • When all Senders or all Receivers are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent, but remaining messages can still be received.
  • Kinds of channels: 1. Bounded channel with limited capacity. 2. Unbounded channel with unlimited capacity.
  • 
                # Cargo.toml
                [package]
                name = "async_channel"
                version = "0.1.0"
                edition = "2018"
                
                [dependencies]
                async-channel = {version = "1.5"}
                tokio = {version = "0.2.*", features = ["full"] }
                
                # main.rs
                use async_channel;
                async fn fun() {
                    let (s, r) = async_channel::unbounded();
                
                    assert_eq!(s.send("Hello").await, Ok(())); //Sends a message into the channel.
                    assert_eq!(r.recv().await, Ok("Hello"));   //Receives a message from channel. If the channel is empty, this method waits until there is a message.
                }
                
                fn main() {
                    let mut rt = tokio::runtime::Runtime::new().unwrap(); //1. Start tokio runtime
                    let local = tokio::task::LocalSet::new();
                    local.block_on(&mut rt, async move { fun().await });
                }