IPC

Rust (Channels)

What is Channel?

Sender and Receiver are connected via Channel. They can send/recv data via channel.

Types Channels?

mpsc / Multi-Producer, Single-Consumer tokio async_channel / Multi-Producer, Multiple-Consumer
Producer, Consumer Multi Producer, 1 Consumer Multi Producer, Multi Consumer
Provided by Standard library Tokio runtime
Unbounded Unbounded means that the channel has no limit on the number of messages it can hold at a time.
Sender can continue sending messages indefinitely, they will accumulate in memory until the receiver consumes them.

Yes
Yes
Asynchronous Means that the sending and receiving operations are non-blocking
Sender and receiver can proceed independently

NO. This is synchronous, ie blocking
Operations are non-blocking ie No one waits on channel(neither sender, nor rcvr)
When all Senders or all Receivers are dropped, the channel becomes closed

Yes. This is Asynchronous ie non-blocking

Multi-Producer, Single-Consumer (mpsc)

1 Sender 1 Reciever (Unbounded, synchronous) 4 Sender 1 Reciever (Unbounded, synchronous) 1 Sender 1 Reciever (Bounded, synchronous)

fn main() {
    let (tx, rx) = std::sync::mpsc::channel();
    match tx.send(1) {      //send() returns Result
        Ok(_o) => {
            println!("Sent");
        },
        Err(e) => println!("Sending error {}", e)
    };
    match rx.recv() {       //recv() returns Result
        Ok(o) => {
            println!("Recieved {}", o);
        },
        Err(e) => println!("Sending error {}", e)
    };
}
$ cargo run
Sent
Recieved 1
                        

use std::{sync::mpsc, thread};
fn main() {
    println!("Hello, world!");
    let mut handles = vec![];
    let (t,r) = mpsc::channel();
    for i in 0..3 {
        let tx = t.clone();
        // Create 3 threads using for loop
        // And push joinHandles inside vector
        handles.push(thread::spawn(move || {
            match tx.send(i) {
                Ok(_o) => {
                    println!("Sent by thread {}", i);
                },
                Err(e) => {
                    println!("Send Error {}", e);
                }
            };
        }));
    }
    drop(t);

    // Single Reciever, recieves all messages
    loop {
        match r.recv() {
            Ok(o) => {
                println!("Rcvd {}", o);
            },
            Err(e) => {
                println!("Recv Error {}", e);
                break;
            }
        };
    }

    // Wait on thread's Handles
    for h in handles {
        h.join().unwrap();
    }
}
                        

use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
    let mut handles = vec![];

    // Create a synchronous channel with a buffer size of 1
    let (t,r) = mpsc::sync_channel::(1);
    
    // Create 3 threads using a for loop
    for i in 0..3 {
        let tx = t.clone();     // Clone the sender for each thread
        // Spawn a new thread
        let handle = thread::spawn(move || {
            println!("Thread {} has finished", i);
            tx.send(i as i32).unwrap();
        });

        // Store the thread handle to join later
        handles.push(handle);
    }

    // Receive messages from threads
    for _ in 0..3 {
        let received = r.recv().unwrap(); // Receive from the channel
        println!("Received from thread: {}", received);
    }
    
    // Wait for all threads to finish
    for handle in handles {
        handle.join().unwrap();
    }

    println!("All threads have completed.");
}
$ cargo run
Thread 0 has finished
Thread 1 has finished
Thread 2 has finished
Received from thread: 0
Received from thread: 1
Received from thread: 2
All threads have completed.
                        

Multi-Producer, Multi-Consumer (Unbounded, Asynchronous)


# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
async-channel = {version = "1.5"}

# main.rs
use async_channel;
use tokio::task;
use tokio;
use tokio::sync::Mutex;
use std::sync::Arc;

async fn send_thread(t: async_channel::Sender>>, var: Arc>) {
    {
        let mut a= var.lock().await;        //returns MutexGuard<_, i32>
        *a += 1;
    }
    match t.send(var).await{    //returns Result<(), SendError>
        Ok(_o) => {
            println!("Sent Arc on channel");
        }, 
        Err(e) => {
            println!("Error: {}", e);
        }
    };

}
async fn recv_thread(r: async_channel::Receiver>>) {
    match r.recv().await{    //returns MutexGuard<_, i32>
        Ok(o) => {
            println!("Recieved Mutex");
            let a = o.lock().await;
            println!("{} ", *a);
        }, 
        Err(e) => {
            println!("Error: {}", e);
        }
    };
}
#[tokio::main]
async fn main() {
    //This would be passed between threads
    let var = Arc::new(Mutex::new(0));
    let mut handles = vec![];
    let (t, r) = async_channel::unbounded();
    // Create 3 tokio tasks, sending on channel
    for _ in 0..3 {
        let tx = t.clone();
        let var_clone = var.clone();
        let handle = task::spawn(send_thread(tx, var_clone));
        handles.push(handle);
    }

    for _ in 0..3 {
        let rx = r.clone();
        let handle = task::spawn(recv_thread(rx));
        handles.push(handle);
    }

    for h in handles {
        h.await.unwrap();
    }
}