Semaphores, Condition Variables, Readers/Writers
引入
上节介绍了如何通过 Lock 让不同 Threads 正确地合作,从而真正获得并行带来的性能提升,同时也介绍 Lock 的底层实现方法。但直接使用 Lock 容易出错,即便像 UNIX 这样稳定的系统,在其诞生 10 年后,仍然每周都可能因为并发 bug 导致系统崩溃。除了容易出错,Lock 的直接适用场景主要在于保证 critical section 的排他性,即 mutex,但仅有 mutex 并不能优雅地解决实践中遇到的并发编程场景,因此本节将介绍在 Lock 之上抽象出来的更高级别的并发编程原语 (primitive):
Semaphore
Monitor
Semaphore
Semaphore 是一种特殊的 Lock,由 Dijkstra 在 60 年代末期提出,是早期 UNIX 的并发编程的主要工具。
Semaphore 是一个非负整数,它支持一下两种原子操作:
P():每次执行时,等待 Semaphore 为整数时,将它减少 1
V():将 Semaphore 增加 1
注:这里的 P 表示 proberen,是测试的意思;V 表示 verhogen,是增加的意思,二者源自荷兰语
Semaphore 主要有两种使用场景:
Mutual Exclusion
当 Semaphore 的初始值为 1 时,它就是一个 Lock,可以用来保护 critical section:
semaphore.P();
// Critical section goes here
semaphore.V();
Scheduling Constraints
例如,当 Semaphore 的初始值为 0 时,可以用来实现 ThreadJoin:
semaphore = 0;
ThreadJoin() {
semaphore.P();
}
ThreadFinish() {
semaphore.V();
}
Producer Consumer With A Bounded Buffer
问题描述:
Producer 将数据放入 shared buffer
Consumer 将数据从 shared buffer 中读出
需要协调好 Producer/Consumer 的工作,保证正确的前提下减少 CPU 资源浪费
一个正确的解决方案需要满足:
当 buffer 为空时,Consumer 必须等待 Producer 放入数据后才能工作 (Scheduling Constraint)
当 buffer 已满时,Producer 必须等待 Consumer 消费数据后才能工作 (Scheduling Constraint)
任意时刻只能有一个 thread 操作 buffer (Mutual Exclusion)
可以想象,我们需要使用 3 个 Semaphores,分别用于 Mutual Exclusion 和 Scheduling Constraints:
Semaphore fullBuffers; // consumer's constraint
Semaphore emptyBuffers; // producer's constraint
Semaphore mutex; // mutual exclusion
构建 Producer 与 Consumer 函数:
Semaphore fullBuffer = 0;
Semaphore emptyBuffers = numBuffers;
Semaphore mutex = 1;
Producer(item) {
emptyBuffers.P();
mutex.P();
Enqueue(item);
mutex.V();
fullBuffers.V();
}
Consumer() {
fullBuffers.P();
mutex.P();
item = Dequeue();
mutex.V();
emptyBuffers.V();
return item;
}
可以看到这里 Producer 与 Consumer 的逻辑并不对称:
Producer:
emptyBuffer.P(), fullBuffer.V()
Consumer:
fullBuffer.P(), emptyBuffer.V()
同时,P 操作的顺序很重要,下面的代码可能造成死锁:
Producer(item) {
mutex.P();
emptyBuffers.P();
Enqueue(item);
mutex.V();
fullBuffers.V();
}
但 V 操作的顺序不重要。如果有 N 个 producers,M 个 consumers,以上代码逻辑仍然适用。
Monitor
Semaphore 既可以用于 Mutual Exclusion,又可以用于 Scheduling Constraints,这种设计有时候会让使用者感到不知所措,不符合 "Do one thing and do it well" 的哲学;这也是 Monitor 被提出的目的:
Use locks for mutual exclusion and condition variables for scheduling constraints
Monitor 由两部分构成:
一个 Lock
用于保护 shared data
在获取 shared data 前必须加锁
在使用 shared data 完毕后必须释放锁
Initially free
零个或多个 condition variables
每个 condition variable 背后都是一个 threads queue,等待进入某个 critical section
与 Semaphore 不同,使用 condition variable 时,可以在 critical section 中等待资源,自动释放锁,被唤醒时自动加锁
Simple Monitor Example
Lock lock;
Queue queue;
AddToQueue(item) {
lock.Acquire();
queue.enqueue(item);
lock.Release();
}
RemoveFromQueue() {
lock.Acquire();
item = queue.dequeue(); // Get next item or null
lock.Release();
return(item); // Might return null
}
本例中仅仅使用了 lock,并未使用 condition variable,尽管它能达到相同的功能,但坏处是:当 queue 中没有数据时,consumer 不会进入等待状态,而是消费到空数据。更合理的逻辑是,当 queue 中没有数据时,consumer 进入等待状态 (sleep),知道 queue 有新的数据时才被唤醒。
Complete Monitor Example
Condition Variable 支持 3 种原子操作:
Wait(&lock):释放 lock 的同时进入睡眠状态,唤醒前重新获取 lock
Signal():若存在,唤醒一个等待的 thread
Broadcast():若存在,唤醒所有等待的 threads
Condition Variable 的使用原则:必须获取 lock 才能执行上述操作。
Lock lock;
Condition dateready;
Queue queue;
AddToQueue(item) {
lock.Acquire();
queue.enqueue(item);
dataready.signal();
lock.Release();
}
RemoveFromQueue() {
lock.Acquire();
while (queue.isEmpty()) {
dataready.wait(&lock);
}
item = queue.dequeue();
lock.Release();
return item;
}
这里需要特别关注:
while (queue.isEmpty()) {
dataready.wait(&lock);
}
为什么不是:
if (queue.isEmpty()) {
dataready.wait(&lock);
}
这取决于 scheduling 的策略:
Hoare-style (most textbooks):
Signaler gives lock, CPU to waiter; waiter runs immediately
Waiter gives up lock, processor back to signaler when it exits critical section or if it waits again
Mesa-style (most real operating systems):
Signaler keeps lock and processor
Waiter placed on ready queue with no special priority
Practically, need to check condition again after wait
Extended Example: Readers/Writers Problem

场景:有一个共享的数据库,有两种用户需要访问它:
Readers:只读取数据,不修改数据
Writers:读取或修改数据
使用一个全局锁可能导致数据库操作效率不高,我们希望:
多个 Readers 可以同时读取数据
只有一个 Writer 可以操作数据
具体限制如下:
Readers 可以在没有 Writers 在等待或者操作时访问数据库
Writers 可以在没有其它 Writers 或者 Readers 在访问数据库时访问数据库
每个时刻只有一个 thread 可以修改系统的状态变量 (state variables)
状态变量包含:
AR: Number of active readers; initially = 0
WR: Number of waiting readers; initially = 0
AW: Number of active writers; initially = 0
WW: Number of waiting writers; initially = 0
Condition: okToRead = NIL
Condition: okToWrite = NIL
Reader() {
// First check into system
lock.Acquire();
while ((AW + WW) > 0) {
WR++;
okToRead.wait(&lock);
WR--;
}
AR++;
lock.Release();
// Perform actual read-only access
// check out of system
lock.Acquire();
AR--;
if (AR == 0 && WW > 0) {
okToWrite.signal();
}
lock.Release();
}
Writer() {
// First check into system
lock.Acquire();
while ((AW+AR) > 0) {
WW++;
okToWrite.wait(&lock);
WW--;
}
lock.Release();
// Perform actual read/write access
lock.Acquire();
AW--;
if (WW > 0) {
okToWrite.signal();
} else if (WR > 0) {
okToRead.broadcast();
}
lock.Release();
}
几个值得思考的问题:
Readers 是否有可能被饿死?
如果我们把 reader.c 中的
if (AR == 0 && WW > 0) {}
去掉,会怎样?如果我们把 reader.c 中的
okToWrite.signal();
修改成okToWrite.broadcast();
会怎样?
利用 Semaphores 构建 Monitors?
上节介绍了 Monitor 由 lock 和 condition 组成,lock 的部分可以直接用 Semaphore = 1 解决,而 Condition 的部分则比较复杂。
V1
Wait() { semaphore.P(); }
Signal() { semaphore.V(); }
这里的问题在于,在 Monitor 语义中,Wait 方法应在 thread 进入睡眠之前释放锁,当前解法会造成死锁。
V2
Wait(Lock lock) {
lock.Release();
semaphore.P();
lock.Acquire();
}
Signal() { semaphore.V(); }
这里 P、V 是 communicative 的,与 condition 的 Wait 和 Signal 语义不同,Signal 的发生对后续的 Wait 没有影响,而 V 操作对后续的 P 操作则有深远的影响。
V3
Wait(Lock lock) {
lock.Release();
semaphore.P();
lock.Acquire();
}
Signal() {
if (semaphore queue is not empty) {
semaphore.V();
}
}
这种方案也有两个问题:
Semaphore 不支持 P, V 以外的操作,如查看队列内容
在 lock.Release 之后,semaphore.P 之前,可能有 Signal 发生,出现 race condition
实际上我们能够利用 Semaphore 构建 Monitor,在教材上有相关方案,比较复杂,这里不赘述。
Monitor Conclusion
Monitor 将 Semaphore 的两种用法分拆,可以用来表示系统的整体逻辑:
必要的时候 Wait
当任意等待中的 threads 可以继续运行时,发送 Signal
每次修改状态变量时,必须先加锁
monitor-based 程序的基本结构如下:
lock.Acquire();
while (need to wait) {
condvar.wait();
}
lock.Release();
// do something so no need to wait
lock.Acquire();
condvar.signal();
lock.Release();
参考
Last updated