Semaphores, Condition Variables, Readers/Writers
引入
上节介绍了如何通过 Lock 让不同 Threads 正确地合作,从而真正获得并行带来的性能提升,同时也介绍 Lock 的底层实现方法。但直接使用 Lock 容易出错,即便像 UNIX 这样稳定的系统,在其诞生 10 年后,仍然每周都可能因为并发 bug 导致系统崩溃。除了容易出错,Lock 的直接适用场景主要在于保证 critical section 的排他性,即 mutex,但仅有 mutex 并不能优雅地解决实践中遇到的并发编程场景,因此本节将介绍在 Lock 之上抽象出来的更高级别的并发编程原语 (primitive):
- Semaphore 
- Monitor 
Semaphore
Semaphore 是一种特殊的 Lock,由 Dijkstra 在 60 年代末期提出,是早期 UNIX 的并发编程的主要工具。
Semaphore 是一个非负整数,它支持一下两种原子操作:
P():每次执行时,等待 Semaphore 为整数时,将它减少 1
V():将 Semaphore 增加 1
注:这里的 P 表示 proberen,是测试的意思;V 表示 verhogen,是增加的意思,二者源自荷兰语
Semaphore 主要有两种使用场景:
Mutual Exclusion
当 Semaphore 的初始值为 1 时,它就是一个 Lock,可以用来保护 critical section:
semaphore.P();
// Critical section goes here
semaphore.V();Scheduling Constraints 
例如,当 Semaphore 的初始值为 0 时,可以用来实现 ThreadJoin:
semaphore = 0;
ThreadJoin() {
    semaphore.P();
}
ThreadFinish() {
    semaphore.V();
}Producer Consumer With A Bounded Buffer
问题描述:
- Producer 将数据放入 shared buffer 
- Consumer 将数据从 shared buffer 中读出 
- 需要协调好 Producer/Consumer 的工作,保证正确的前提下减少 CPU 资源浪费 
一个正确的解决方案需要满足:
- 当 buffer 为空时,Consumer 必须等待 Producer 放入数据后才能工作 (Scheduling Constraint) 
- 当 buffer 已满时,Producer 必须等待 Consumer 消费数据后才能工作 (Scheduling Constraint) 
- 任意时刻只能有一个 thread 操作 buffer (Mutual Exclusion) 
可以想象,我们需要使用 3 个 Semaphores,分别用于 Mutual Exclusion 和 Scheduling Constraints:
Semaphore fullBuffers;   // consumer's constraint
Semaphore emptyBuffers;  // producer's constraint
Semaphore mutex;         // mutual exclusion构建 Producer 与 Consumer 函数:
Semaphore fullBuffer = 0;
Semaphore emptyBuffers = numBuffers;
Semaphore mutex = 1;
Producer(item) {
    emptyBuffers.P();
    mutex.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}
Consumer() {
    fullBuffers.P();
    mutex.P();
    item = Dequeue();
    mutex.V();
    emptyBuffers.V();
    return item;
}可以看到这里 Producer 与 Consumer 的逻辑并不对称:
- Producer: - emptyBuffer.P(), fullBuffer.V()
- Consumer: - fullBuffer.P(), emptyBuffer.V()
同时,P 操作的顺序很重要,下面的代码可能造成死锁:
Producer(item) {
    mutex.P();
    emptyBuffers.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}但 V 操作的顺序不重要。如果有 N 个 producers,M 个 consumers,以上代码逻辑仍然适用。
Monitor
Semaphore 既可以用于 Mutual Exclusion,又可以用于 Scheduling Constraints,这种设计有时候会让使用者感到不知所措,不符合 "Do one thing and do it well" 的哲学;这也是 Monitor 被提出的目的:
Use locks for mutual exclusion and condition variables for scheduling constraints
Monitor 由两部分构成:
- 一个 Lock - 用于保护 shared data 
- 在获取 shared data 前必须加锁 
- 在使用 shared data 完毕后必须释放锁 
- Initially free 
 
- 零个或多个 condition variables - 每个 condition variable 背后都是一个 threads queue,等待进入某个 critical section 
- 与 Semaphore 不同,使用 condition variable 时,可以在 critical section 中等待资源,自动释放锁,被唤醒时自动加锁 
 
Simple Monitor Example
Lock lock;
Queue queue;
AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    lock.Release();
}
RemoveFromQueue() {
    lock.Acquire();
    item = queue.dequeue();  // Get next item or null
    lock.Release();
    return(item);            // Might return null
}本例中仅仅使用了 lock,并未使用 condition variable,尽管它能达到相同的功能,但坏处是:当 queue 中没有数据时,consumer 不会进入等待状态,而是消费到空数据。更合理的逻辑是,当 queue 中没有数据时,consumer 进入等待状态 (sleep),知道 queue 有新的数据时才被唤醒。
Complete Monitor Example
Condition Variable 支持 3 种原子操作:
- Wait(&lock):释放 lock 的同时进入睡眠状态,唤醒前重新获取 lock 
- Signal():若存在,唤醒一个等待的 thread 
- Broadcast():若存在,唤醒所有等待的 threads 
Condition Variable 的使用原则:必须获取 lock 才能执行上述操作。
Lock lock;
Condition dateready;
Queue queue;
AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    dataready.signal();
    lock.Release();
}
RemoveFromQueue() {
    lock.Acquire();
    while (queue.isEmpty()) {
        dataready.wait(&lock);
    }
    item = queue.dequeue();
    lock.Release();
    return item;
}这里需要特别关注:
while (queue.isEmpty()) {
    dataready.wait(&lock);
}为什么不是:
if (queue.isEmpty()) {
    dataready.wait(&lock);
}这取决于 scheduling 的策略:
- Hoare-style (most textbooks): - Signaler gives lock, CPU to waiter; waiter runs immediately 
- Waiter gives up lock, processor back to signaler when it exits critical section or if it waits again 
 
- Mesa-style (most real operating systems): - Signaler keeps lock and processor 
- Waiter placed on ready queue with no special priority 
- Practically, need to check condition again after wait 
 
Extended Example: Readers/Writers Problem

场景:有一个共享的数据库,有两种用户需要访问它:
- Readers:只读取数据,不修改数据 
- Writers:读取或修改数据 
使用一个全局锁可能导致数据库操作效率不高,我们希望:
- 多个 Readers 可以同时读取数据 
- 只有一个 Writer 可以操作数据 
具体限制如下:
- Readers 可以在没有 Writers 在等待或者操作时访问数据库 
- Writers 可以在没有其它 Writers 或者 Readers 在访问数据库时访问数据库 
- 每个时刻只有一个 thread 可以修改系统的状态变量 (state variables) 
状态变量包含:
- AR: Number of active readers; initially = 0 
- WR: Number of waiting readers; initially = 0 
- AW: Number of active writers; initially = 0 
- WW: Number of waiting writers; initially = 0 
- Condition: okToRead = NIL 
- Condition: okToWrite = NIL 
Reader() {
    // First check into system
    lock.Acquire();
    while ((AW + WW) > 0) {
        WR++;
        okToRead.wait(&lock);
        WR--;
    }
    AR++;
    lock.Release();
    
    // Perform actual read-only access
    
    // check out of system
    lock.Acquire();
    AR--;
    if (AR == 0 && WW > 0) {
        okToWrite.signal();
    }
    lock.Release();
}Writer() {
    // First check into system
    lock.Acquire();
    while ((AW+AR) > 0) {
        WW++;
        okToWrite.wait(&lock);
        WW--;
    }
    lock.Release();
    
    // Perform actual read/write access
    
    lock.Acquire();
    AW--;
    if (WW > 0) {
        okToWrite.signal();
    } else if (WR > 0) {
        okToRead.broadcast();
    }
    lock.Release();
}几个值得思考的问题:
- Readers 是否有可能被饿死? 
- 如果我们把 reader.c 中的 - if (AR == 0 && WW > 0) {}去掉,会怎样?
- 如果我们把 reader.c 中的 - okToWrite.signal();修改成- okToWrite.broadcast();会怎样?
利用 Semaphores 构建 Monitors?
上节介绍了 Monitor 由 lock 和 condition 组成,lock 的部分可以直接用 Semaphore = 1 解决,而 Condition 的部分则比较复杂。
V1
Wait() { semaphore.P(); }
Signal() { semaphore.V(); }这里的问题在于,在 Monitor 语义中,Wait 方法应在 thread 进入睡眠之前释放锁,当前解法会造成死锁。
V2
Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { semaphore.V(); }这里 P、V 是 communicative 的,与 condition 的 Wait 和 Signal 语义不同,Signal 的发生对后续的 Wait 没有影响,而 V 操作对后续的 P 操作则有深远的影响。
V3
Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { 
    if (semaphore queue is not empty) {
        semaphore.V();
    }
}这种方案也有两个问题:
- Semaphore 不支持 P, V 以外的操作,如查看队列内容 
- 在 lock.Release 之后,semaphore.P 之前,可能有 Signal 发生,出现 race condition 
实际上我们能够利用 Semaphore 构建 Monitor,在教材上有相关方案,比较复杂,这里不赘述。
Monitor Conclusion
Monitor 将 Semaphore 的两种用法分拆,可以用来表示系统的整体逻辑:
- 必要的时候 Wait 
- 当任意等待中的 threads 可以继续运行时,发送 Signal 
- 每次修改状态变量时,必须先加锁 
monitor-based 程序的基本结构如下:
lock.Acquire();
while (need to wait) {
    condvar.wait();
}
lock.Release();
// do something so no need to wait
lock.Acquire();
condvar.signal();
lock.Release();参考
Last updated
