Cooperating Threads, Synchronization

本节大纲:

  • Synchronization Operations

  • Higher-level Synchronization Abstractions

    • Semaphores, monitors, and condition variables

  • Programming paradigms for concurrent programs

当我们有了 threads 后,并发地执行 threads 无论在单核还是多核环境下都可以获得比 single thread/process 更多的好处:

  • share resources

    • one computer, many users

    • one bank balance, many ATMs

    • embedded systems (robot control: coordinate arm & hand)

  • speedup

    • overlap I/O and computation

    • multiprocessor

  • modularity

    • chop large problem up into simpler pieces

但天下没有免费的午餐,并发条件下,系统的正确性将面临更大的挑战。想象 thread dispatcher,它可能在任意时刻切换 thread,如果每个 thread 之间共享了数据,则可能出现许多意想不到的场景,产生意料之外的执行结果,由于不同 threads 的执行过程可能按无限种方式交织(interleaving),程序员无法通过测试来发现这样的问题。可以分以下两种场景考虑:

  • Independent Threads:threads 之前完全没有状态共享,程序执行过程是确定的,结果是可复现的,scheduling 的顺序无所谓

  • Cooperating Threads:threads 之间存在状态共享,程序执行过程不确定,执行结果不可复现,这种情况下出现的 bug 称为 Heisenbugs

但仔细思考,实际上不存在 independent threads,每个 process 都共享文件系统、操作系统资源、网络、以及底下的硬件、电路板,一个 device driver 可能导致 thread A 把 thread B 搞崩溃。本节的核心问题其实就是如何能够正确利用 threads 的并发。

Real Problems

ATM Bank Server

ATM Bank Server 负责处理所有 ATM 的存钱取钱请求,如下图所示:

假设我们需要为 ATM 实现一个请求处理函数:

BankServer() {
while (TRUE) {
ReceiveRequest(&top, &acctId, &amount);
ProcessRequest(op, acctId, amount);
}
}
ProcessRequest(op, acctId, amount) {
if (op == deposit) Deposit(acctId, amount);
else if //...
}
Deposit(acctId, amount) {
acct = GetAccount(acctId);
acct->balance += amount;
StoreAccount(acct);
}

Event Driven Version of ATM Server

假设我们只有一个 CPU,或者我们始终只有一个 thread 在处理请求,这样正确性就不是问题:

BankServer() {
while (TRUE) {
event = WaitForNextEvent();
if (event == ATMRequest)
StartOnRequest();
else if (event == AcctAvail)
ContinueRequest();
else if (event == AcctStored)
FinishRequest();
}
}

尽管解决了正确性,但:

  • 万一遇到 blocking I/O,所有后续请求都在等待

  • 为了取得更高的 CPU 利用率,我们需要把业务代码拆得足够细,事件拆得足够细,将 I/O 和 computation 尽可能地 overlap 起来,最终达到提高 cpu 利用率的目的。这种编程形式在图形化交互编程领域使用得较多

Cooperating Threads Version of ATM Server

为了保证在 threads 并发下 ATM Server 的正确性,我们希望 Deposit 函数能够具备原子性 --- 要么函数执行完毕,要么未执行,而不能中途被打断,否则可能出现以下的情形:

Therac-25

一个放射形治疗仪器,因为 threads cooperation 的问题造成软件没能准确控制放射剂量,导致多为病人死亡。

Problem is at the lowest level

在下图所示的两个 threads 中,由于可能的不同执行顺序会导致不同的 x 取值,如 3, 5 等

那么如下图所示的两个 threads 中,x 的取值可能是多少?

除了 1、2,是否还有别的可能?

如果是在 serial processors 中执行,thread A 写入 0001,B 写入 0010,调度的顺序是 ABABABBA,每次写入一个 byte 就切换上下文,就有可能得到 3。

因此我们在寻找并发问题的解决方案时,需要了解底层支持的 operations 中,哪些是 indivisible,即 atomic operations。

Atomic Operations:an operation that always runs to completion or not at all

atomic operations 是 threads cooperation 的基石。那么哪些 operations 是 atomic?

  • memory references and assignments

  • load and stores of words:因此在这种情况下,上一个例子中 x = 3 是不可能的

而有些指令是非 atomic 的,如:

  • double-precision floating point store

  • VAX and IBM 360 had an instruction to copy a whole array

Too Much Milk

假设你和你的室友共享一个冰箱,冰箱里存放着牛奶,你和你的室友发现冰箱没有牛奶时都会去超时购买牛奶放入冰箱,这时候就可能出现如下事件:

这个问题的解决方案应当实现:

  1. 永远不要有多于 1 个人购买牛奶

  2. 当没有牛奶时,有人去购买牛奶

方案一:使用便利贴 ❌

if (noMilk) {
if (noNote) {
leave Note;
buy milk;
reomve note;
}
}

对于人,这没问题;但如果是计算机,仍然有可能出现牛奶过多的情况,两个 threads 都发现没有 milk 和 note 以后,就会都去购买牛奶。

方案二:先留便利贴,再检查牛奶 ❌

leave Note;
if (noMilk) {
if (noNote) {
leave Note;
buy milk;
}
}
remove Note

对于人,这没问题;但如果是计算机,则两个 threads 都不会购买牛奶

方案三:每个人使用自己特有的便利贴 ❌

尽管很罕见,但仍然有可能出现两个 threads 都不会购买牛奶的情况。

方案四:每个人使用自己特有的便利贴2 ✅

仔细推理,考虑各种可能,可以发现这种方案竟然是正确的,但方案有很明显的缺点:

  • 即使在解决这种简单的问题都显得很复杂

  • Thread A 和 Thread B 的代码不对称,很难推广到更多个 threads 合作的场景

  • 当 A 在等待 B 时,会出现 busy-waiting,跟 B 争抢 CPU 时间

方案五:使用锁 ✅

假设计算机硬件为我们提供了比 load/store 更高抽象级别的 primitives --- lock,它支持两种操作:

  • Lock.Acquire() - 等待 lock 被释放,就立即获取

  • Lock.Release() - 释放锁,唤醒其它正在等待的 thread

lock 保证在同一时间只有一个 thread 可以成功的获取锁,因此 milk problem 的解法就变成:

milklock.Acquire();
if (nomilk)
buy milk;
milklock.Release();

Acquire() 与 Release() 之间的部分称为 "Critical Section"。

Implement Higher Level Synchronization Primitives: Lock

实际上,计算机的硬件层为我们提供了许多 atomic operations:

  • Load/Store

  • Disable/Enable Interrupts

  • Test&Set

  • Comp&Swap

我们需要用它们来实现 higher-level api,供用户程序使用,使得编写 threads cooperation 代码更加方便。

How to implement Locks

Interrupts

如果直接使用 load/store,我们就来到 milk 问题的方案四,这种方式复杂且容易出错。是否可以使用别的硬件层面的 atomic operations,比如 Interrupts?

最直接的方法就是:

LockAcquire() {
disable Ints;
}
LockRelease() {
enable Ints;
}

问题在于:

  • 不可以让 user 控制 interrupts

  • critical sections 的执行时间不可控,因此可能导致系统长时间被困在一个 thread 中

改进方法如下:使用一个 lock 变量,使得该变量同时只能被一个激活

int value = FREE;
Acquire() {
disable interrupts;
if (value == BUSY) {
put thread on wait queue;
go to sleep();
} else {
value = BUSY;
}
enable interrupts;
}
Release() {
disable interrupts;
if (anyone on wait queue) {
take thread off wait queue;
place on ready queue;
} else {
value = FREE;
}
enable interrupts;
}

与之前的解法不同,我们将加锁操作的 critical section 缩短至最小,这些操作必须通过 kernel 来实现,用户不会直接控制 interrupts,同时用户进程在 critical section 中的执行时间可以任意长。

但从上述解法中,我们还需要在 Acquire 失败的过程中,enable interrupts,否则其他进程将无法调用 Acquire 或 Release,如下图所示:

在图中指定的三个位置插入 enable interrupts 都无法解决问题:

  • 位置1:还没把 thread 放到 wait queue 上,导致 wakeUp 信号没有被接收到

  • 位置2:thread 还没有 go to sleep 就可能被 wakeUp

  • 位置3:thread 已经 sleep,就程序无法执行到这个位置就 sleep 了

正解:将 enable interrupts 的责任丢给下一个被调度的线程:

利用 interrupts 来实现 lock 的缺陷在于:

  • 无法在 user-level 完成 Acquire 和 Release

  • 在 multiprocessor 环境下,需要对多个 processor 执行 disable interrupts,中间增加了额外的通信成本

Atomic Read-Modify-Write Instructions

顾名思义,若不论在单核还是多核环境下,硬件层面能提供 atomic read-modify-write 指令序列,就可以利用它们构建 higher-level api,如:

test&set (&address) { /* most architecture */
result = M[address];
M[address] = 1;
return result;
}
swap (&address, register) { /* x86 */
tmp = M[address];
M[address] = register;
register = tmp;
}
compare&swap (&address, reg1, reg2) { /* 68000 */
if (reg1 == M[address]) {
M[address] = reg2;
return access;
} else {
return failure;
}
}
load-linked&store conditional(&address) {
/* R40000, alpha */
loop:
l1 r1, M[address];
movi r2, 1;
sc r2, M[address];
beqz r2, loop;
}

Implementing Locks With test&set

int value = 0;
Acquire() {
while (test&set(value)); // busy waiting
}
Release() {
value = 0;
}
  • Pros

    • 不需要使用 interrupts,interrupts 的正常功能不受影响

    • 用户代码可以使用该 lock,无需进入 kernel mode

    • 在多核机器上也能工作

  • Cons

    • busy-waiting, inefficient

    • waiting thread may take cycles away from thread

    • priority inversion: if busy-waiting thread has higher priority than thread holding lock => no progress

改进版:

int guard = 0;
int value = FREE;
Acquire() {
// Short busy-wait time
while (test&set(guard));
if (value == BUSY) {
put thread on wait queue;
go to sleep() & guard=0;
} else {
value = BUSY;
guard = 0;
}
}
Release() {
// Short busy-wait time
while (test&set(guard));
if anyone on wait queue {
take thread off wait queue
place on ready queue
} else {
value = FREE;
}
guard = 0;
}

参考

slides