Rust Channel(for IPC)

Sender <T> and Receiver <T> are connected via Channel. They can send/recv data via channel.

Types of Channels

1. Multi-Producer, Single-Consumer (mpsc)

Provided by: Standard library
Unbounded? Yes. Means that the channel has no limit on the number of messages it can hold at a time. Sender can continue sending messages indefinitely, they will accumulate in memory until the receiver consumes them.
Asynchronous: No. Means that the sending and receiving operations are non-blocking Sender and receiver can proceed independently. This is synchronous(ie blocking)
1 Sender 1 Reciever (Unbounded, synchronous) 4 Sender 1 Reciever (Unbounded, synchronous) 1 Sender 1 Reciever (Bounded, synchronous)

fn main() {
let (tx, rx) = std::sync::mpsc::channel();
match tx.send(1) {      //send() returns Result
    Ok(_o) => {
        println!("Sent");
    },
    Err(e) => println!("Sending error {}", e)
};
match rx.recv() {       //recv() returns Result
    Ok(o) => {
        println!("Recieved {}", o);
    },
    Err(e) => println!("Sending error {}", e)
};
}
$ cargo run
Sent
Recieved 1
                    

use std::{sync::mpsc, thread};
fn main() {
println!("Hello, world!");
let mut handles = vec![];
let (t,r) = mpsc::channel();
for i in 0..3 {
    let tx = t.clone();
    // Create 3 threads using for loop
    // And push joinHandles inside vector
    handles.push(thread::spawn(move || {
        match tx.send(i) {
            Ok(_o) => {
                println!("Sent by thread {}", i);
            },
            Err(e) => {
                println!("Send Error {}", e);
            }
        };
    }));
}
drop(t);

// Single Reciever, recieves all messages
loop {
    match r.recv() {
        Ok(o) => {
            println!("Rcvd {}", o);
        },
        Err(e) => {
            println!("Recv Error {}", e);
            break;
        }
    };
}

// Wait on thread's Handles
for h in handles {
    h.join().unwrap();
}
}
                    

use std::thread;
use std::time::Duration;
use std::sync::mpsc;

fn main() {
let mut handles = vec![];

// Create a synchronous channel with a buffer size of 1
let (t,r) = mpsc::sync_channel:: <i32>(1);

// Create 3 threads using a for loop
for i in 0..3 {
    let tx = t.clone();     // Clone the sender for each thread
    // Spawn a new thread
    let handle = thread::spawn(move || {
        println!("Thread {} has finished", i);
        tx.send(i as i32).unwrap();
    });

    // Store the thread handle to join later
    handles.push(handle);
}

// Receive messages from threads
for _ in 0..3 {
    let received = r.recv().unwrap(); // Receive from the channel
    println!("Received from thread: {}", received);
}

// Wait for all threads to finish
for handle in handles {
    handle.join().unwrap();
}

println!("All threads have completed.");
}
$ cargo run
Thread 0 has finished
Thread 1 has finished
Thread 2 has finished
Received from thread: 0
Received from thread: 1
Received from thread: 2
All threads have completed.
                    

2. Multi-Producer, Multiple-Consumer

Provided by: tokio runtime
Unbounded: yes
Asynchronous: yes

multi consumer, producer(Unbounded, Asynchronous)

# Cargo.toml
[dependencies]
tokio = { version = "1", features = ["full"] }
async-channel = {version = "1.5"}

# main.rs
use async_channel;
use tokio::task;
use tokio;
use tokio::sync::Mutex;
use std::sync::Arc;

async fn send_thread(t: async_channel::Sender <Arc<Mutex<i32>>>, var: Arc<Mutex<i32>>) {
{
    let mut a= var.lock().await;        //returns MutexGuard<_, i32>
    *a += 1;
}
match t.send(var).await{    //returns Result<(), SendError<i32>>
    Ok(_o) => {
        println!("Sent Arc<Mutex> on channel");
    }, 
    Err(e) => {
        println!("Error: {}", e);
    }
};

}
async fn recv_thread(r: async_channel::Receiver<Arc<Mutex<i32>>>) {
match r.recv().await{    //returns MutexGuard<_, i32>
    Ok(o) => {
        println!("Recieved Mutex");
        let a = o.lock().await;
        println!("{} ", *a);
    }, 
    Err(e) => {
        println!("Error: {}", e);
    }
};
}
#[tokio::main]
async fn main() {
//This would be passed between threads
let var = Arc::new(Mutex::new(0));
let mut handles = vec![];
let (t, r) = async_channel::unbounded();
// Create 3 tokio tasks, sending on channel
for _ in 0..3 {
    let tx = t.clone();
    let var_clone = var.clone();
    let handle = task::spawn(send_thread(tx, var_clone));
    handles.push(handle);
}

for _ in 0..3 {
    let rx = r.clone();
    let handle = task::spawn(recv_thread(rx));
    handles.push(handle);
}

for h in handles {
    h.await.unwrap();
}
}