1. Mutex / Mutual Exculsion / Locking mechanism / Block / Sleep
- How Mutex is internally implemented?
When the lock is set, no other thread can access the locked region of code. Mutex lock will only be released by the thread who locked it.
- Wake up?
if 1000 threads are waiting, wakeup call to activate 1000 comes(also called thundering herd), but scheduler wakes up 1 thread(at its discretion) & 999 falls to sleep.
CPP
Problems with Mutex
Problem | Description |
---|---|
Priority Inversion |
Lower priority process is executing in Critical section, suddenly High-Priority process is scheduled,
lower-priority process is preempted & thrown out of CS & higher priority process excecutes in CS.
Also if Higher priority thread/process is Busy Waiting then lower priority process will never get CPU(ie never scheduled). Can PI happen on user-level threads? No, there is no preemption in user level threads. |
Easy Deadlock | if order of mutex locking/unlocking is not correct, that can led to easy dead-lock situation. See Dead-lock example. |
Thread holding mutex paniced | if thread-1 which holding the lock panics, whole process would panic. |
Mutex and data are seperate Entities |
Thread-1,2 are accessing data using mutex, But thread-3 changed the data without mutex, this should not Happen. Solutions: 1. Making mutex and data as single entity as done in Rust 2. All times keeping in mind that data should not handled outside mutex guards
|
Creating Mutex
// Note asynchronous nature of threads
// Thread-2 starts earlier than thread-1.
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
std::mutex m;
int a = 5;
void test(int tid) {
m.lock();
a++;
std::cout << "Thread: " << tid;
std::cout << "a:" << a << endl;
m.unlock();
}
int main() {
std::thread t1(test,1);
std::thread t2(test,2);
t1.join();
t2.join();
return 0;
}
///Output without m.lock(), m.unlock()///
//Because Global variables are not thread safe.
Thread: Thread: 1, a:7
2, a:6
///Output with m.lock(), m.unlock()///
Thread: 2, a:6
Thread: 1, a:7
Wrappers Around Mutex
-
Wrapper means, these classes owns the mutex. To create object of any of these classes mutex has to be passed.
1. lock_guard
lock_guard owns the mutex, then this mutex is handled with lock_guard.
mutex mtx;
lock_guard <mutex> lgLock(mtx); //Mutex(mtx) is owned by lock_guard
- Templated class lock_guard
lock_guard provides mutex for duration of scoped block.
We cannot copy lock_guard, because operator = is deleted. Hence its not copy or move assignable.
Why? If someone locks the mutex and forgets to unlock, then whole process will block.
template <typename T>
class lock_guard {
public:
lock_guard(T a):mutex(a){
a.lock();
}
~lock_guard(){
a.unlock();
}
operator=[deleted] //Cannot copy lock_guard
};
#include
#include
#include
using namespace std;
mutex m;
void fun(const char* name, int loop){
//1. Create object of lock_guard and mutex is locked. Same as m.lock()
lock_guard lgLock(m);
for (int i=0;i<loop; ++i){
cout << name << ": " << i << endl;
}
//2. No need to do m.unlock(). This will done in destructor of lock_guard object
}
int main(){
thread t1(fun, "T1", 3);
thread t2(fun, "T2", 3);
t1.join();
t2.join();
return 0;
}
$ g++ test.cpp -lpthread
$ ./a.out
T1: 0
T1: 1
T1: 2
T2: 0
T2: 1
T2: 2
2. unique_lock
unique_lockulock(mtx); //Lock immdiately unique_lock ulock(mtx, defer_lock); //deferred locking: Acquire the mutex but donot lock immediately. unique_lock ulock(mtx); //time-constrained attempts at locking: try_lock_for(), try_lock_until() mtx.try_lock_for(5 millisec); //recursive locking //unique_lock can be moved ie it is (MoveConstructible and MoveAssignable) but cannot be copied (not of CopyConstructible or CopyAssignable).
POSIX
Creating mutex
#include <pthread.h>
#include <stdio.h>
int counter;
pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
void *fun() {
//Thread-2 sleeps until Thread-1 unlocks the mutex
pthread_mutex_lock(&lock);
printf("Inside CS\n");
pthread_mutex_unlock(&lock);
}
int main(){
pthread_t tid1,tid2; //Defined as int
pthread_create(&tid1,NULL,&fun,NULL);
pthread_create(&tid2,NULL,&fun,NULL);
pthread_join(tid1, NULL);
pthread_join(tid2, NULL);
}
Rust
Creating Mutex
-
In Rust, data and mutex are not seperate. ie data can be accessed inside mutex only.
This solves the problem which we is present in CPP
use std::sync::Mutex;
fn main() {
// Create mutex and associate a i32 data with it(whose initial value=5).
let mtx = Mutex::new(5);
{
//After acquiring lock, data inside mutex can be changed
let mut n = mtx.lock().unwrap();
*n = 6;
}
{
// Print after acquiring the lock
let n = mtx.lock().unwrap();
println!("{}", n);
}
}
Mutex in threads
Mutex without Arc used inside threads(Code not compile)
-
Problem:
std::sync::Mutex does not implement the
Copy trait,
hence it cannot be moved between multiple threads.
That means if we want to use mutex between threads, shared reference (ie Arc) need to be used
use std::sync::Mutex;
use std::thread;
fn main() {
let mtx = Mutex::new(0); //Initialize a Mutex protecting an integer
let mut handles = vec![];
for i in 0..2 {
let h = thread::spawn(move || { //Spawn threads
let mut n = mtx.lock().unwrap();
*n += 1; // Modify the protected data
});
handles.push(h);
}
for h in handles { // Wait for all threads to finish
h.join().unwrap();
}
}
$ cargo run
`mutex` has type `Mutex`, which does not implement the `Copy` trait
10 | let h = thread::spawn(move || {
| ------- value moved into closure here, in previous iteration of loop
Arc <Mutex>
-
How Using Mutex inside Arc solves the problem?
Arc is Shared reference counting.
That means Mutex is not copied between threads, but only reference is incremented.
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let mtx = Arc::new(Mutex::new(0)); // Initialize a Mutex protecting an integer
let mut handles = vec![];
for i in 0..2 {
let c = Arc::clone(&mtx); // Every thread will have new Reference. Arc is reference counted
let h = thread::spawn(move || { // Spawn threads
let mut n = c.lock().unwrap();
*n += 1; // Modify the protected data
});
handles.push(h);
}
for h in handles {
h.join().unwrap(); // Wait for all threads to finish
}
// Final value after all increments
let c1 = Arc::clone(&mtx);
let n = c1.lock().unwrap();
println!("Final value: {}", *n); // 2
}
$ cargo run
Thread 0 incremented value to 1
Thread 1 incremented value to 2
Final value: 2