# Cooperating Threads, Synchronization

本节大纲：

* Synchronization Operations
* Higher-level Synchronization Abstractions
  * Semaphores, monitors, and condition variables
* Programming paradigms for concurrent programs

当我们有了 threads 后，并发地执行 threads 无论在单核还是多核环境下都可以获得比 single thread/process 更多的好处：

* share resources
  * one computer, many users
  * one bank balance, many ATMs
  * embedded systems (robot control: coordinate arm & hand)
* speedup
  * overlap I/O and computation
  * multiprocessor
* modularity
  * chop large problem up into simpler pieces

但天下没有免费的午餐，并发条件下，系统的正确性将面临更大的挑战。想象 thread dispatcher，它可能在任意时刻切换 thread，如果每个 thread 之间共享了数据，则可能出现许多意想不到的场景，产生意料之外的执行结果，由于不同 threads 的执行过程可能按无限种方式交织（interleaving），程序员无法通过测试来发现这样的问题。可以分以下两种场景考虑：

* Independent Threads：threads 之前完全没有状态共享，程序执行过程是确定的，结果是可复现的，scheduling 的顺序无所谓
* Cooperating Threads：threads 之间存在状态共享，程序执行过程不确定，执行结果不可复现，这种情况下出现的 bug 称为 Heisenbugs

但仔细思考，实际上不存在 independent threads，每个 process 都共享文件系统、操作系统资源、网络、以及底下的硬件、电路板，一个 device driver 可能导致 thread A 把 thread B 搞崩溃。本节的核心问题其实就是如何能够正确利用 threads 的并发。

## Real Problems

### ATM Bank Server

ATM Bank Server 负责处理所有 ATM 的存钱取钱请求，如下图所示：

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_kw2UmCB6ek73dY_Ao%2F-L_l0drVRzqHGr82k5lc%2FScreen%20Shot%202019-03-12%20at%204.30.45%20PM.jpg?alt=media\&token=ac8be0a7-4b12-47f2-9e09-5e7907e5d9d8)

假设我们需要为 ATM 实现一个请求处理函数：

```c
BankServer() {
    while (TRUE) {
        ReceiveRequest(&top, &acctId, &amount);
        ProcessRequest(op, acctId, amount);
    }
}

 ProcessRequest(op, acctId, amount) {
     if (op == deposit) Deposit(acctId, amount);
     else if //...
 }
 
 Deposit(acctId, amount) {
     acct = GetAccount(acctId);
     acct->balance += amount;
     StoreAccount(acct);
 }
```

#### Event Driven Version of ATM Server

假设我们只有一个 CPU，或者我们始终只有一个 thread 在处理请求，这样正确性就不是问题：

```c
BankServer() {
    while (TRUE) {
        event = WaitForNextEvent();
        if (event == ATMRequest)
            StartOnRequest();
        else if (event == AcctAvail)
            ContinueRequest();
        else if (event == AcctStored)
            FinishRequest();
    }
}
```

尽管解决了正确性，但：

* 万一遇到 blocking I/O，所有后续请求都在等待
* 为了取得更高的 CPU 利用率，我们需要把业务代码拆得足够细，事件拆得足够细，将 I/O 和 computation 尽可能地 overlap 起来，最终达到提高 cpu 利用率的目的。这种编程形式在图形化交互编程领域使用得较多

#### Cooperating Threads Version of ATM Server

为了保证在 threads 并发下 ATM Server 的正确性，我们希望 Deposit 函数能够具备原子性 --- 要么函数执行完毕，要么未执行，而不能中途被打断，否则可能出现以下的情形：

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_kw2UmCB6ek73dY_Ao%2F-L_l6rHL0BBp-o8v0KZu%2FScreen%20Shot%202019-03-12%20at%204.57.54%20PM.jpg?alt=media\&token=c1c39432-7eed-440f-81a6-9187b6a002ff)

### Therac-25

一个放射形治疗仪器，因为 threads cooperation 的问题造成软件没能准确控制放射剂量，导致多为病人死亡。

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mEC4RqfT7olsjJ5-H%2FScreen%20Shot%202019-03-12%20at%2010.09.33%20PM.jpg?alt=media\&token=973f7406-d3d2-4c9f-900d-d14c6d68e44c)

#### Problem is at the lowest level

在下图所示的两个 threads 中，由于可能的不同执行顺序会导致不同的 x 取值，如 3, 5 等

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_m9bCT_ycqrVAXstIq%2FScreen%20Shot%202019-03-12%20at%209.49.31%20PM.jpg?alt=media\&token=4a40138d-7465-4abc-acd0-46a23f5d8fbf)

那么如下图所示的两个 threads 中，x 的取值可能是多少？

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_m9yQLn7dWvGCvmaGx%2FScreen%20Shot%202019-03-12%20at%209.51.06%20PM.jpg?alt=media\&token=f806c495-5c9b-4925-a27c-ccec0fc61a63)

除了 1、2，是否还有别的可能？

如果是在 serial processors 中执行，thread A 写入 0001，B 写入 0010，调度的顺序是 ABABABBA，每次写入一个 byte 就切换上下文，就有可能得到 3。

因此我们在寻找并发问题的解决方案时，需要了解底层支持的 operations 中，哪些是 indivisible，即 atomic operations。

#### Atomic Operations：an operation that always runs to completion or not at all

atomic operations 是 threads cooperation 的基石。那么哪些 operations 是 atomic？

* memory references and assignments
* load and stores of words：因此在这种情况下，上一个例子中 x = 3 是不可能的

而有些指令是非 atomic 的，如：

* double-precision floating point store
* VAX and IBM 360 had an instruction to copy a whole array

### Too Much Milk

假设你和你的室友共享一个冰箱，冰箱里存放着牛奶，你和你的室友发现冰箱没有牛奶时都会去超时购买牛奶放入冰箱，这时候就可能出现如下事件：

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mFNd8iu4NhWzKsBBb%2FScreen%20Shot%202019-03-12%20at%2010.14.44%20PM.jpg?alt=media\&token=b3777aca-a5e5-493e-8ca7-15c85377795b)

这个问题的解决方案应当实现：

1. 永远不要有多于 1 个人购买牛奶
2. 当没有牛奶时，有人去购买牛奶

#### 方案一：使用便利贴 ❌

```c
if (noMilk) {
    if (noNote) {
        leave Note;
        buy milk;
        reomve note;
    }
}
```

对于人，这没问题；但如果是计算机，仍然有可能出现牛奶过多的情况，两个 threads 都发现没有 milk 和 note 以后，就会都去购买牛奶。

#### 方案二：先留便利贴，再检查牛奶 ❌

```c
leave Note;
if (noMilk) {
    if (noNote) {
        leave Note;
        buy milk;
    }
}
remove Note
```

对于人，这没问题；但如果是计算机，则两个 threads 都不会购买牛奶

#### 方案三：每个人使用自己特有的便利贴 ❌

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mIeES9mZ00reYq0yx%2FScreen%20Shot%202019-03-12%20at%2010.29.02%20PM.jpg?alt=media\&token=909573aa-1834-4c56-9630-bd48ffdc745e)

尽管很罕见，但仍然有可能出现两个 threads 都不会购买牛奶的情况。

#### 方案四：每个人使用自己特有的便利贴2 ✅

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mJ2zFFR0MX38luLfU%2FScreen%20Shot%202019-03-12%20at%2010.30.48%20PM.jpg?alt=media\&token=e88e07ca-96a7-4ff4-8e10-f7d05f49fabc)

仔细推理，考虑各种可能，可以发现这种方案竟然是正确的，但方案有很明显的缺点：

* 即使在解决这种简单的问题都显得很复杂
* Thread A 和 Thread B 的代码不对称，很难推广到更多个 threads 合作的场景
* 当 A 在等待 B 时，会出现 busy-waiting，跟 B 争抢 CPU 时间

#### 方案五：使用锁 ✅

假设计算机硬件为我们提供了比 load/store 更高抽象级别的 primitives --- lock，它支持两种操作：

* Lock.Acquire() - 等待 lock 被释放，就立即获取
* Lock.Release() - 释放锁，唤醒其它正在等待的 thread

lock 保证在同一时间只有一个 thread 可以成功的获取锁，因此 milk problem 的解法就变成：

```c
milklock.Acquire();
if (nomilk)
    buy milk;
milklock.Release();
```

Acquire() 与 Release() 之间的部分称为 "Critical Section"。

## Implement Higher Level Synchronization Primitives: Lock

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mMOKXijLukGsPHNwT%2FScreen%20Shot%202019-03-12%20at%2010.45.22%20PM.jpg?alt=media\&token=cee19e8f-975a-4b42-ad0f-cfb051a7d0c1)

实际上，计算机的硬件层为我们提供了许多 atomic operations：

* Load/Store
* Disable/Enable Interrupts
* Test\&Set
* Comp\&Swap

我们需要用它们来实现 higher-level api，供用户程序使用，使得编写 threads cooperation 代码更加方便。

### How to implement Locks

#### Interrupts

如果直接使用 load/store，我们就来到 milk 问题的方案四，这种方式复杂且容易出错。是否可以使用别的硬件层面的 atomic operations，比如 Interrupts？

最直接的方法就是：

```c
LockAcquire() {
    disable Ints;
}

LockRelease() {
    enable Ints;
}
```

问题在于：

* 不可以让 user 控制 interrupts
* critical sections 的执行时间不可控，因此可能导致系统长时间被困在一个 thread 中

改进方法如下：使用一个 lock 变量，使得该变量同时只能被一个激活

```c
int value = FREE;

Acquire() {
    disable interrupts;
    if (value == BUSY) {
        put thread on wait queue;
        go to sleep();
    } else {
        value = BUSY;
    }
    enable interrupts;
}

Release() {
    disable interrupts;
    if (anyone on wait queue) {
        take thread off wait queue;
        place on ready queue;
    } else {
        value = FREE;
    }
    enable interrupts;
}
```

与之前的解法不同，我们将加锁操作的 critical section 缩短至最小，这些操作必须通过 kernel 来实现，用户不会直接控制 interrupts，同时用户进程在 critical section 中的执行时间可以任意长。

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mP07Va7ox-gvfgp4e%2FScreen%20Shot%202019-03-12%20at%2010.56.49%20PM.jpg?alt=media\&token=0267d883-107f-485c-be78-e9b0c5d0d709)

但从上述解法中，我们还需要在 Acquire 失败的过程中，enable interrupts，否则其他进程将无法调用 Acquire 或 Release，如下图所示：

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mPbhRM_6V-rZL001K%2FScreen%20Shot%202019-03-12%20at%2010.59.27%20PM.jpg?alt=media\&token=4de6a7a7-2456-4caf-8cae-546e1bb3d276)

在图中指定的三个位置插入 enable interrupts 都无法解决问题：

* 位置1：还没把 thread 放到 wait queue 上，导致 wakeUp 信号没有被接收到
* 位置2：thread 还没有 go to sleep 就可能被 wakeUp
* 位置3：thread 已经 sleep，就程序无法执行到这个位置就 sleep 了

正解：将 enable interrupts 的责任丢给下一个被调度的线程：

![](https://1008303647-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-LMjQD5UezC9P8miypMG%2F-L_m7rwAkzwHTW-l0DKk%2F-L_mRUZTk_NOosqMge9x%2FScreen%20Shot%202019-03-12%20at%2011.07.38%20PM.jpg?alt=media\&token=4a776dc0-193c-4f8f-a8e9-7604f1b6b15d)

利用 interrupts 来实现 lock 的缺陷在于：

* 无法在 user-level 完成 Acquire 和 Release
* 在 multiprocessor 环境下，需要对多个 processor 执行 disable interrupts，中间增加了额外的通信成本

#### Atomic Read-Modify-Write Instructions

顾名思义，若不论在单核还是多核环境下，硬件层面能提供 atomic read-modify-write 指令序列，就可以利用它们构建 higher-level api，如：

```c
test&set (&address) {           /* most architecture */
    result = M[address];
    M[address] = 1;
    return result;
}

swap (&address, register) {     /* x86 */
    tmp = M[address];
    M[address] = register;
    register = tmp;
}

compare&swap (&address, reg1, reg2) { /* 68000 */
    if (reg1 == M[address]) {
        M[address] = reg2;
        return access;
    } else {
        return failure;
    }
}

load-linked&store conditional(&address) {
    /* R40000, alpha */
    loop:
        l1 r1, M[address];
        movi r2, 1;
        sc r2, M[address];
        beqz r2, loop;
}
```

#### Implementing Locks With test\&set

```c
int value = 0;
Acquire() {
    while (test&set(value));  // busy waiting
}
Release() {
    value = 0;
}
```

* Pros
  * 不需要使用 interrupts，interrupts 的正常功能不受影响
  * 用户代码可以使用该 lock，无需进入 kernel mode
  * 在多核机器上也能工作
* Cons
  * busy-waiting, inefficient
  * waiting thread may take cycles away from thread
  * priority inversion: if busy-waiting thread has higher priority than thread holding lock => no progress

改进版：

```c
int guard = 0;
int value = FREE;

Acquire() {
    // Short busy-wait time
    while (test&set(guard));
    if (value == BUSY) {
        put thread on wait queue;
        go to sleep() & guard=0;
    } else {
        value = BUSY;
        guard = 0;
    }
}

Release() {
    // Short busy-wait time
    while (test&set(guard));
    if anyone on wait queue {
        take thread off wait queue
        place on ready queue
    } else {
        value = FREE;
    }
    guard = 0;
}
```

## 参考

[slides](https://inst.eecs.berkeley.edu/~cs162/sp15/static/lectures/7.pdf)
