Semaphores, Condition Variables, Readers/Writers

引入

上节介绍了如何通过 Lock 让不同 Threads 正确地合作,从而真正获得并行带来的性能提升,同时也介绍 Lock 的底层实现方法。但直接使用 Lock 容易出错,即便像 UNIX 这样稳定的系统,在其诞生 10 年后,仍然每周都可能因为并发 bug 导致系统崩溃。除了容易出错,Lock 的直接适用场景主要在于保证 critical section 的排他性,即 mutex,但仅有 mutex 并不能优雅地解决实践中遇到的并发编程场景,因此本节将介绍在 Lock 之上抽象出来的更高级别的并发编程原语 (primitive):

  • Semaphore

  • Monitor

Semaphore

Semaphore 是一种特殊的 Lock,由 Dijkstra 在 60 年代末期提出,是早期 UNIX 的并发编程的主要工具。

Semaphore 是一个非负整数,它支持一下两种原子操作:

  • P():每次执行时,等待 Semaphore 为整数时,将它减少 1

  • V():将 Semaphore 增加 1

注:这里的 P 表示 proberen,是测试的意思;V 表示 verhogen,是增加的意思,二者源自荷兰语

Semaphore 主要有两种使用场景:

Mutual Exclusion

当 Semaphore 的初始值为 1 时,它就是一个 Lock,可以用来保护 critical section:

mutual_exclusion.c
semaphore.P();
// Critical section goes here
semaphore.V();

Scheduling Constraints

例如,当 Semaphore 的初始值为 0 时,可以用来实现 ThreadJoin:

semaphore = 0;
ThreadJoin() {
    semaphore.P();
}

ThreadFinish() {
    semaphore.V();
}

Producer Consumer With A Bounded Buffer

问题描述:

  • Producer 将数据放入 shared buffer

  • Consumer 将数据从 shared buffer 中读出

  • 需要协调好 Producer/Consumer 的工作,保证正确的前提下减少 CPU 资源浪费

一个正确的解决方案需要满足:

  • 当 buffer 为空时,Consumer 必须等待 Producer 放入数据后才能工作 (Scheduling Constraint)

  • 当 buffer 已满时,Producer 必须等待 Consumer 消费数据后才能工作 (Scheduling Constraint)

  • 任意时刻只能有一个 thread 操作 buffer (Mutual Exclusion)

可以想象,我们需要使用 3 个 Semaphores,分别用于 Mutual Exclusion 和 Scheduling Constraints:

Semaphore fullBuffers;   // consumer's constraint
Semaphore emptyBuffers;  // producer's constraint
Semaphore mutex;         // mutual exclusion

构建 Producer 与 Consumer 函数:

Semaphore fullBuffer = 0;
Semaphore emptyBuffers = numBuffers;
Semaphore mutex = 1;

Producer(item) {
    emptyBuffers.P();
    mutex.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}

Consumer() {
    fullBuffers.P();
    mutex.P();
    item = Dequeue();
    mutex.V();
    emptyBuffers.V();
    return item;
}

可以看到这里 Producer 与 Consumer 的逻辑并不对称:

  • Producer: emptyBuffer.P(), fullBuffer.V()

  • Consumer: fullBuffer.P(), emptyBuffer.V()

同时,P 操作的顺序很重要,下面的代码可能造成死锁:

Producer(item) {
    mutex.P();
    emptyBuffers.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}

但 V 操作的顺序不重要。如果有 N 个 producers,M 个 consumers,以上代码逻辑仍然适用。

Monitor

Semaphore 既可以用于 Mutual Exclusion,又可以用于 Scheduling Constraints,这种设计有时候会让使用者感到不知所措,不符合 "Do one thing and do it well" 的哲学;这也是 Monitor 被提出的目的:

Use locks for mutual exclusion and condition variables for scheduling constraints

Monitor 由两部分构成:

  • 一个 Lock

    • 用于保护 shared data

    • 在获取 shared data 前必须加锁

    • 在使用 shared data 完毕后必须释放锁

    • Initially free

  • 零个或多个 condition variables

    • 每个 condition variable 背后都是一个 threads queue,等待进入某个 critical section

    • 与 Semaphore 不同,使用 condition variable 时,可以在 critical section 中等待资源,自动释放锁,被唤醒时自动加锁

Simple Monitor Example

Lock lock;
Queue queue;

AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    lock.Release();
}

RemoveFromQueue() {
    lock.Acquire();
    item = queue.dequeue();  // Get next item or null
    lock.Release();
    return(item);            // Might return null
}

本例中仅仅使用了 lock,并未使用 condition variable,尽管它能达到相同的功能,但坏处是:当 queue 中没有数据时,consumer 不会进入等待状态,而是消费到空数据。更合理的逻辑是,当 queue 中没有数据时,consumer 进入等待状态 (sleep),知道 queue 有新的数据时才被唤醒。

Complete Monitor Example

Condition Variable 支持 3 种原子操作:

  • Wait(&lock):释放 lock 的同时进入睡眠状态,唤醒前重新获取 lock

  • Signal():若存在,唤醒一个等待的 thread

  • Broadcast():若存在,唤醒所有等待的 threads

Condition Variable 的使用原则:必须获取 lock 才能执行上述操作。

Lock lock;
Condition dateready;
Queue queue;

AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    dataready.signal();
    lock.Release();
}

RemoveFromQueue() {
    lock.Acquire();
    while (queue.isEmpty()) {
        dataready.wait(&lock);
    }
    item = queue.dequeue();
    lock.Release();
    return item;
}

这里需要特别关注:

while (queue.isEmpty()) {
    dataready.wait(&lock);
}

为什么不是:

if (queue.isEmpty()) {
    dataready.wait(&lock);
}

这取决于 scheduling 的策略:

  • Hoare-style (most textbooks):

    • Signaler gives lock, CPU to waiter; waiter runs immediately

    • Waiter gives up lock, processor back to signaler when it exits critical section or if it waits again

  • Mesa-style (most real operating systems):

    • Signaler keeps lock and processor

    • Waiter placed on ready queue with no special priority

    • Practically, need to check condition again after wait

Extended Example: Readers/Writers Problem

场景:有一个共享的数据库,有两种用户需要访问它:

  • Readers:只读取数据,不修改数据

  • Writers:读取或修改数据

使用一个全局锁可能导致数据库操作效率不高,我们希望:

  • 多个 Readers 可以同时读取数据

  • 只有一个 Writer 可以操作数据

具体限制如下:

  • Readers 可以在没有 Writers 在等待或者操作时访问数据库

  • Writers 可以在没有其它 Writers 或者 Readers 在访问数据库时访问数据库

  • 每个时刻只有一个 thread 可以修改系统的状态变量 (state variables)

状态变量包含:

  • AR: Number of active readers; initially = 0

  • WR: Number of waiting readers; initially = 0

  • AW: Number of active writers; initially = 0

  • WW: Number of waiting writers; initially = 0

  • Condition: okToRead = NIL

  • Condition: okToWrite = NIL

reader.c
Reader() {
    // First check into system
    lock.Acquire();
    while ((AW + WW) > 0) {
        WR++;
        okToRead.wait(&lock);
        WR--;
    }
    AR++;
    lock.Release();
    
    // Perform actual read-only access
    
    // check out of system
    lock.Acquire();
    AR--;
    if (AR == 0 && WW > 0) {
        okToWrite.signal();
    }
    lock.Release();
}
writer.c
Writer() {
    // First check into system
    lock.Acquire();
    while ((AW+AR) > 0) {
        WW++;
        okToWrite.wait(&lock);
        WW--;
    }
    lock.Release();
    
    // Perform actual read/write access
    
    lock.Acquire();
    AW--;
    if (WW > 0) {
        okToWrite.signal();
    } else if (WR > 0) {
        okToRead.broadcast();
    }
    lock.Release();
}

几个值得思考的问题:

  • Readers 是否有可能被饿死?

  • 如果我们把 reader.c 中的 if (AR == 0 && WW > 0) {} 去掉,会怎样?

  • 如果我们把 reader.c 中的 okToWrite.signal(); 修改成 okToWrite.broadcast(); 会怎样?

利用 Semaphores 构建 Monitors?

上节介绍了 Monitor 由 lock 和 condition 组成,lock 的部分可以直接用 Semaphore = 1 解决,而 Condition 的部分则比较复杂。

V1

Wait() { semaphore.P(); }
Signal() { semaphore.V(); }

这里的问题在于,在 Monitor 语义中,Wait 方法应在 thread 进入睡眠之前释放锁,当前解法会造成死锁。

V2

Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { semaphore.V(); }

这里 P、V 是 communicative 的,与 condition 的 Wait 和 Signal 语义不同,Signal 的发生对后续的 Wait 没有影响,而 V 操作对后续的 P 操作则有深远的影响。

V3

Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { 
    if (semaphore queue is not empty) {
        semaphore.V();
    }
}

这种方案也有两个问题:

  • Semaphore 不支持 P, V 以外的操作,如查看队列内容

  • 在 lock.Release 之后,semaphore.P 之前,可能有 Signal 发生,出现 race condition

实际上我们能够利用 Semaphore 构建 Monitor,在教材上有相关方案,比较复杂,这里不赘述。

Monitor Conclusion

Monitor 将 Semaphore 的两种用法分拆,可以用来表示系统的整体逻辑:

  • 必要的时候 Wait

  • 当任意等待中的 threads 可以继续运行时,发送 Signal

  • 每次修改状态变量时,必须先加锁

monitor-based 程序的基本结构如下:

lock.Acquire();
while (need to wait) {
    condvar.wait();
}
lock.Release();

// do something so no need to wait

lock.Acquire();
condvar.signal();
lock.Release();

参考

slides 1, slides 2

Last updated