# Semaphores, Condition Variables, Readers/Writers

## 引入

上节介绍了如何通过 Lock 让不同 Threads 正确地合作，从而真正获得并行带来的性能提升，同时也介绍 Lock 的底层实现方法。但直接使用 Lock 容易出错，即便像 UNIX 这样稳定的系统，在其诞生 10 年后，仍然每周都可能因为并发 bug 导致系统崩溃。除了容易出错，Lock 的直接适用场景主要在于保证 critical section 的排他性，即 mutex，但仅有 mutex 并不能优雅地解决实践中遇到的并发编程场景，因此本节将介绍在 Lock 之上抽象出来的更高级别的并发编程原语 (primitive)：

* Semaphore
* Monitor

## Semaphore

Semaphore 是一种特殊的 Lock，由 Dijkstra 在 60 年代末期提出，是早期 UNIX 的并发编程的主要工具。

> Semaphore 是一个非负整数，它支持一下两种原子操作：
>
> * P()：每次执行时，等待 Semaphore 为整数时，将它减少 1
> * V()：将 Semaphore 增加 1
>
> 注：这里的 P 表示 proberen，是测试的意思；V 表示 verhogen，是增加的意思，二者源自荷兰语

Semaphore 主要有两种使用场景：

#### Mutual Exclusion

当 Semaphore 的初始值为 1 时，它就是一个 Lock，可以用来保护 critical section：

{% code title="mutual\_exclusion.c" %}

```c
semaphore.P();
// Critical section goes here
semaphore.V();
```

{% endcode %}

#### Scheduling Constraints&#x20;

例如，当 Semaphore 的初始值为 0 时，可以用来实现 ThreadJoin：

```c
semaphore = 0;
ThreadJoin() {
    semaphore.P();
}

ThreadFinish() {
    semaphore.V();
}
```

### Producer Consumer With A Bounded Buffer

问题描述：

* Producer 将数据放入 shared buffer
* Consumer 将数据从 shared buffer 中读出
* 需要协调好 Producer/Consumer 的工作，保证正确的前提下减少 CPU 资源浪费

一个正确的解决方案需要满足：

* 当 buffer 为空时，Consumer 必须等待 Producer 放入数据后才能工作 (Scheduling Constraint)
* 当 buffer 已满时，Producer 必须等待 Consumer 消费数据后才能工作 (Scheduling Constraint)
* 任意时刻只能有一个 thread 操作 buffer (Mutual Exclusion)

可以想象，我们需要使用 3 个 Semaphores，分别用于 Mutual Exclusion 和 Scheduling Constraints：

```c
Semaphore fullBuffers;   // consumer's constraint
Semaphore emptyBuffers;  // producer's constraint
Semaphore mutex;         // mutual exclusion
```

构建 Producer 与 Consumer 函数：

```c
Semaphore fullBuffer = 0;
Semaphore emptyBuffers = numBuffers;
Semaphore mutex = 1;

Producer(item) {
    emptyBuffers.P();
    mutex.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}

Consumer() {
    fullBuffers.P();
    mutex.P();
    item = Dequeue();
    mutex.V();
    emptyBuffers.V();
    return item;
}
```

可以看到这里 Producer 与 Consumer 的逻辑并不对称：

* Producer: `emptyBuffer.P(), fullBuffer.V()`
* Consumer: `fullBuffer.P(), emptyBuffer.V()`

同时，P 操作的顺序很重要，下面的代码可能造成死锁：

```c
Producer(item) {
    mutex.P();
    emptyBuffers.P();
    Enqueue(item);
    mutex.V();
    fullBuffers.V();
}
```

但 V 操作的顺序不重要。如果有 N 个 producers，M 个 consumers，以上代码逻辑仍然适用。

## Monitor

Semaphore 既可以用于 Mutual Exclusion，又可以用于 Scheduling Constraints，这种设计有时候会让使用者感到不知所措，不符合 "Do one thing and do it well" 的哲学；这也是 Monitor 被提出的目的：

> Use locks for mutual exclusion and condition variables for scheduling constraints

Monitor 由两部分构成：

* 一个 Lock
  * 用于保护 shared data
  * 在获取 shared data 前必须加锁
  * 在使用 shared data 完毕后必须释放锁
  * Initially free
* 零个或多个 condition variables
  * 每个 condition variable 背后都是一个 threads queue，等待进入某个 critical section
  * 与 Semaphore 不同，使用 condition variable 时，可以在 critical section 中等待资源，自动释放锁，被唤醒时自动加锁

### Simple Monitor Example

```c
Lock lock;
Queue queue;

AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    lock.Release();
}

RemoveFromQueue() {
    lock.Acquire();
    item = queue.dequeue();  // Get next item or null
    lock.Release();
    return(item);            // Might return null
}
```

本例中仅仅使用了 lock，并未使用 condition variable，尽管它能达到相同的功能，但坏处是：当 queue 中没有数据时，consumer 不会进入等待状态，而是消费到空数据。更合理的逻辑是，当 queue 中没有数据时，consumer 进入等待状态 (sleep)，知道 queue 有新的数据时才被唤醒。

### Complete Monitor Example

Condition Variable 支持 3 种原子操作：

* Wait(\&lock)：释放 lock 的同时进入睡眠状态，唤醒前重新获取 lock
* Signal()：若存在，唤醒一个等待的 thread
* Broadcast()：若存在，唤醒所有等待的 threads

Condition Variable 的使用原则：必须获取 lock 才能执行上述操作。

```c
Lock lock;
Condition dateready;
Queue queue;

AddToQueue(item) {
    lock.Acquire();
    queue.enqueue(item);
    dataready.signal();
    lock.Release();
}

RemoveFromQueue() {
    lock.Acquire();
    while (queue.isEmpty()) {
        dataready.wait(&lock);
    }
    item = queue.dequeue();
    lock.Release();
    return item;
}
```

这里需要特别关注：

```c
while (queue.isEmpty()) {
    dataready.wait(&lock);
}
```

为什么不是：

```c
if (queue.isEmpty()) {
    dataready.wait(&lock);
}
```

这取决于 scheduling 的策略：

* Hoare-style (most textbooks)：
  * Signaler gives lock, CPU to waiter; waiter runs immediately
  * Waiter gives up lock, processor back to signaler when it exits critical section or if it waits again
* Mesa-style (most real operating systems):
  * Signaler keeps lock and processor
  * Waiter placed on ready queue with no special priority
  * **Practically, need to check condition again after wait**

### **Extended Example: Readers/Writers Problem**

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-LajgfBZBZNcn5-1l_GT%2F-Lajh7JmnVrV91By2DGW%2FScreen%20Shot%202019-03-24%20at%208.37.09%20PM.jpg?alt=media\&token=14a738e3-a698-42b2-b22b-c6d6e858b08f)

场景：有一个共享的数据库，有两种用户需要访问它：

* Readers：只读取数据，不修改数据
* Writers：读取或修改数据

使用一个全局锁可能导致数据库操作效率不高，我们希望：

* 多个 Readers 可以同时读取数据
* 只有一个 Writer 可以操作数据

具体限制如下：

* Readers 可以在没有 Writers 在等待或者操作时访问数据库
* Writers 可以在没有其它 Writers 或者 Readers 在访问数据库时访问数据库
* 每个时刻只有一个 thread 可以修改系统的状态变量 (state variables)

状态变量包含：

* AR: Number of active readers; initially = 0
* WR: Number  of waiting readers; initially = 0
* AW: Number of active writers; initially = 0
* WW: Number of waiting writers; initially = 0
* Condition: okToRead = NIL
* Condition: okToWrite = NIL

{% code title="reader.c" %}

```c
Reader() {
    // First check into system
    lock.Acquire();
    while ((AW + WW) > 0) {
        WR++;
        okToRead.wait(&lock);
        WR--;
    }
    AR++;
    lock.Release();
    
    // Perform actual read-only access
    
    // check out of system
    lock.Acquire();
    AR--;
    if (AR == 0 && WW > 0) {
        okToWrite.signal();
    }
    lock.Release();
}
```

{% endcode %}

{% code title="writer.c" %}

```c
Writer() {
    // First check into system
    lock.Acquire();
    while ((AW+AR) > 0) {
        WW++;
        okToWrite.wait(&lock);
        WW--;
    }
    lock.Release();
    
    // Perform actual read/write access
    
    lock.Acquire();
    AW--;
    if (WW > 0) {
        okToWrite.signal();
    } else if (WR > 0) {
        okToRead.broadcast();
    }
    lock.Release();
}
```

{% endcode %}

几个值得思考的问题：

* Readers 是否有可能被饿死？
* 如果我们把 reader.c 中的 `if (AR == 0 && WW > 0) {}` 去掉，会怎样？
* 如果我们把 reader.c 中的 `okToWrite.signal();` 修改成 `okToWrite.broadcast();` 会怎样？

### 利用 Semaphores 构建 Monitors？

上节介绍了 Monitor 由 lock 和 condition 组成，lock 的部分可以直接用 Semaphore = 1 解决，而 Condition 的部分则比较复杂。

#### V1

```c
Wait() { semaphore.P(); }
Signal() { semaphore.V(); }
```

这里的问题在于，在 Monitor 语义中，Wait 方法应在 thread 进入睡眠之前释放锁，当前解法会造成死锁。

#### V2

```c
Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { semaphore.V(); }
```

这里 P、V 是 communicative 的，与 condition 的 Wait 和 Signal 语义不同，Signal 的发生对后续的 Wait 没有影响，而 V 操作对后续的 P 操作则有深远的影响。

#### V3

```c
Wait(Lock lock) { 
    lock.Release();
    semaphore.P();
    lock.Acquire();
}
Signal() { 
    if (semaphore queue is not empty) {
        semaphore.V();
    }
}
```

这种方案也有两个问题：

* Semaphore 不支持 P, V 以外的操作，如查看队列内容
* 在 lock.Release 之后，semaphore.P 之前，可能有 Signal 发生，出现 race condition

实际上我们能够利用 Semaphore 构建 Monitor，在教材上有相关方案，比较复杂，这里不赘述。

### Monitor Conclusion

Monitor 将 Semaphore 的两种用法分拆，可以用来表示系统的整体逻辑：

* 必要的时候 Wait
* 当任意等待中的 threads 可以继续运行时，发送 Signal
* 每次修改状态变量时，必须先加锁

monitor-based 程序的基本结构如下：

```c
lock.Acquire();
while (need to wait) {
    condvar.wait();
}
lock.Release();

// do something so no need to wait

lock.Acquire();
condvar.signal();
lock.Release();
```

## 参考

[slides 1](https://inst.eecs.berkeley.edu/~cs162/sp15/static/lectures/8.pdf), [slides 2](https://inst.eecs.berkeley.edu/~cs162/sp15/static/lectures/9.pdf)
