Cooperating Threads, Synchronization
本节大纲:
- Synchronization Operations 
- Higher-level Synchronization Abstractions - Semaphores, monitors, and condition variables 
 
- Programming paradigms for concurrent programs 
当我们有了 threads 后,并发地执行 threads 无论在单核还是多核环境下都可以获得比 single thread/process 更多的好处:
- share resources - one computer, many users 
- one bank balance, many ATMs 
- embedded systems (robot control: coordinate arm & hand) 
 
- speedup - overlap I/O and computation 
- multiprocessor 
 
- modularity - chop large problem up into simpler pieces 
 
但天下没有免费的午餐,并发条件下,系统的正确性将面临更大的挑战。想象 thread dispatcher,它可能在任意时刻切换 thread,如果每个 thread 之间共享了数据,则可能出现许多意想不到的场景,产生意料之外的执行结果,由于不同 threads 的执行过程可能按无限种方式交织(interleaving),程序员无法通过测试来发现这样的问题。可以分以下两种场景考虑:
- Independent Threads:threads 之前完全没有状态共享,程序执行过程是确定的,结果是可复现的,scheduling 的顺序无所谓 
- Cooperating Threads:threads 之间存在状态共享,程序执行过程不确定,执行结果不可复现,这种情况下出现的 bug 称为 Heisenbugs 
但仔细思考,实际上不存在 independent threads,每个 process 都共享文件系统、操作系统资源、网络、以及底下的硬件、电路板,一个 device driver 可能导致 thread A 把 thread B 搞崩溃。本节的核心问题其实就是如何能够正确利用 threads 的并发。
Real Problems
ATM Bank Server
ATM Bank Server 负责处理所有 ATM 的存钱取钱请求,如下图所示:

假设我们需要为 ATM 实现一个请求处理函数:
BankServer() {
    while (TRUE) {
        ReceiveRequest(&top, &acctId, &amount);
        ProcessRequest(op, acctId, amount);
    }
}
 ProcessRequest(op, acctId, amount) {
     if (op == deposit) Deposit(acctId, amount);
     else if //...
 }
 
 Deposit(acctId, amount) {
     acct = GetAccount(acctId);
     acct->balance += amount;
     StoreAccount(acct);
 }Event Driven Version of ATM Server
假设我们只有一个 CPU,或者我们始终只有一个 thread 在处理请求,这样正确性就不是问题:
BankServer() {
    while (TRUE) {
        event = WaitForNextEvent();
        if (event == ATMRequest)
            StartOnRequest();
        else if (event == AcctAvail)
            ContinueRequest();
        else if (event == AcctStored)
            FinishRequest();
    }
}尽管解决了正确性,但:
- 万一遇到 blocking I/O,所有后续请求都在等待 
- 为了取得更高的 CPU 利用率,我们需要把业务代码拆得足够细,事件拆得足够细,将 I/O 和 computation 尽可能地 overlap 起来,最终达到提高 cpu 利用率的目的。这种编程形式在图形化交互编程领域使用得较多 
Cooperating Threads Version of ATM Server
为了保证在 threads 并发下 ATM Server 的正确性,我们希望 Deposit 函数能够具备原子性 --- 要么函数执行完毕,要么未执行,而不能中途被打断,否则可能出现以下的情形:

Therac-25
一个放射形治疗仪器,因为 threads cooperation 的问题造成软件没能准确控制放射剂量,导致多为病人死亡。

Problem is at the lowest level
在下图所示的两个 threads 中,由于可能的不同执行顺序会导致不同的 x 取值,如 3, 5 等

那么如下图所示的两个 threads 中,x 的取值可能是多少?

除了 1、2,是否还有别的可能?
如果是在 serial processors 中执行,thread A 写入 0001,B 写入 0010,调度的顺序是 ABABABBA,每次写入一个 byte 就切换上下文,就有可能得到 3。
因此我们在寻找并发问题的解决方案时,需要了解底层支持的 operations 中,哪些是 indivisible,即 atomic operations。
Atomic Operations:an operation that always runs to completion or not at all
atomic operations 是 threads cooperation 的基石。那么哪些 operations 是 atomic?
- memory references and assignments 
- load and stores of words:因此在这种情况下,上一个例子中 x = 3 是不可能的 
而有些指令是非 atomic 的,如:
- double-precision floating point store 
- VAX and IBM 360 had an instruction to copy a whole array 
Too Much Milk
假设你和你的室友共享一个冰箱,冰箱里存放着牛奶,你和你的室友发现冰箱没有牛奶时都会去超时购买牛奶放入冰箱,这时候就可能出现如下事件:

这个问题的解决方案应当实现:
- 永远不要有多于 1 个人购买牛奶 
- 当没有牛奶时,有人去购买牛奶 
方案一:使用便利贴 ❌
if (noMilk) {
    if (noNote) {
        leave Note;
        buy milk;
        reomve note;
    }
}对于人,这没问题;但如果是计算机,仍然有可能出现牛奶过多的情况,两个 threads 都发现没有 milk 和 note 以后,就会都去购买牛奶。
方案二:先留便利贴,再检查牛奶 ❌
leave Note;
if (noMilk) {
    if (noNote) {
        leave Note;
        buy milk;
    }
}
remove Note对于人,这没问题;但如果是计算机,则两个 threads 都不会购买牛奶
方案三:每个人使用自己特有的便利贴 ❌

尽管很罕见,但仍然有可能出现两个 threads 都不会购买牛奶的情况。
方案四:每个人使用自己特有的便利贴2 ✅

仔细推理,考虑各种可能,可以发现这种方案竟然是正确的,但方案有很明显的缺点:
- 即使在解决这种简单的问题都显得很复杂 
- Thread A 和 Thread B 的代码不对称,很难推广到更多个 threads 合作的场景 
- 当 A 在等待 B 时,会出现 busy-waiting,跟 B 争抢 CPU 时间 
方案五:使用锁 ✅
假设计算机硬件为我们提供了比 load/store 更高抽象级别的 primitives --- lock,它支持两种操作:
- Lock.Acquire() - 等待 lock 被释放,就立即获取 
- Lock.Release() - 释放锁,唤醒其它正在等待的 thread 
lock 保证在同一时间只有一个 thread 可以成功的获取锁,因此 milk problem 的解法就变成:
milklock.Acquire();
if (nomilk)
    buy milk;
milklock.Release();Acquire() 与 Release() 之间的部分称为 "Critical Section"。
Implement Higher Level Synchronization Primitives: Lock

实际上,计算机的硬件层为我们提供了许多 atomic operations:
- Load/Store 
- Disable/Enable Interrupts 
- Test&Set 
- Comp&Swap 
我们需要用它们来实现 higher-level api,供用户程序使用,使得编写 threads cooperation 代码更加方便。
How to implement Locks
Interrupts
如果直接使用 load/store,我们就来到 milk 问题的方案四,这种方式复杂且容易出错。是否可以使用别的硬件层面的 atomic operations,比如 Interrupts?
最直接的方法就是:
LockAcquire() {
    disable Ints;
}
LockRelease() {
    enable Ints;
}问题在于:
- 不可以让 user 控制 interrupts 
- critical sections 的执行时间不可控,因此可能导致系统长时间被困在一个 thread 中 
改进方法如下:使用一个 lock 变量,使得该变量同时只能被一个激活
int value = FREE;
Acquire() {
    disable interrupts;
    if (value == BUSY) {
        put thread on wait queue;
        go to sleep();
    } else {
        value = BUSY;
    }
    enable interrupts;
}
Release() {
    disable interrupts;
    if (anyone on wait queue) {
        take thread off wait queue;
        place on ready queue;
    } else {
        value = FREE;
    }
    enable interrupts;
}与之前的解法不同,我们将加锁操作的 critical section 缩短至最小,这些操作必须通过 kernel 来实现,用户不会直接控制 interrupts,同时用户进程在 critical section 中的执行时间可以任意长。

但从上述解法中,我们还需要在 Acquire 失败的过程中,enable interrupts,否则其他进程将无法调用 Acquire 或 Release,如下图所示:

在图中指定的三个位置插入 enable interrupts 都无法解决问题:
- 位置1:还没把 thread 放到 wait queue 上,导致 wakeUp 信号没有被接收到 
- 位置2:thread 还没有 go to sleep 就可能被 wakeUp 
- 位置3:thread 已经 sleep,就程序无法执行到这个位置就 sleep 了 
正解:将 enable interrupts 的责任丢给下一个被调度的线程:

利用 interrupts 来实现 lock 的缺陷在于:
- 无法在 user-level 完成 Acquire 和 Release 
- 在 multiprocessor 环境下,需要对多个 processor 执行 disable interrupts,中间增加了额外的通信成本 
Atomic Read-Modify-Write Instructions
顾名思义,若不论在单核还是多核环境下,硬件层面能提供 atomic read-modify-write 指令序列,就可以利用它们构建 higher-level api,如:
test&set (&address) {           /* most architecture */
    result = M[address];
    M[address] = 1;
    return result;
}
swap (&address, register) {     /* x86 */
    tmp = M[address];
    M[address] = register;
    register = tmp;
}
compare&swap (&address, reg1, reg2) { /* 68000 */
    if (reg1 == M[address]) {
        M[address] = reg2;
        return access;
    } else {
        return failure;
    }
}
load-linked&store conditional(&address) {
    /* R40000, alpha */
    loop:
        l1 r1, M[address];
        movi r2, 1;
        sc r2, M[address];
        beqz r2, loop;
}Implementing Locks With test&set
int value = 0;
Acquire() {
    while (test&set(value));  // busy waiting
}
Release() {
    value = 0;
}- Pros - 不需要使用 interrupts,interrupts 的正常功能不受影响 
- 用户代码可以使用该 lock,无需进入 kernel mode 
- 在多核机器上也能工作 
 
- Cons - busy-waiting, inefficient 
- waiting thread may take cycles away from thread 
- priority inversion: if busy-waiting thread has higher priority than thread holding lock => no progress 
 
改进版:
int guard = 0;
int value = FREE;
Acquire() {
    // Short busy-wait time
    while (test&set(guard));
    if (value == BUSY) {
        put thread on wait queue;
        go to sleep() & guard=0;
    } else {
        value = BUSY;
        guard = 0;
    }
}
Release() {
    // Short busy-wait time
    while (test&set(guard));
    if anyone on wait queue {
        take thread off wait queue
        place on ready queue
    } else {
        value = FREE;
    }
    guard = 0;
}参考
Last updated
