Cooperating Threads, Synchronization
本节大纲:
Synchronization Operations
Higher-level Synchronization Abstractions
Semaphores, monitors, and condition variables
Programming paradigms for concurrent programs
当我们有了 threads 后,并发地执行 threads 无论在单核还是多核环境下都可以获得比 single thread/process 更多的好处:
share resources
one computer, many users
one bank balance, many ATMs
embedded systems (robot control: coordinate arm & hand)
speedup
overlap I/O and computation
multiprocessor
modularity
chop large problem up into simpler pieces
但天下没有免费的午餐,并发条件下,系统的正确性将面临更大的挑战。想象 thread dispatcher,它可能在任意时刻切换 thread,如果每个 thread 之间共享了数据,则可能出现许多意想不到的场景,产生意料之外的执行结果,由于不同 threads 的执行过程可能按无限种方式交织(interleaving),程序员无法通过测试来发现这样的问题。可以分以下两种场景考虑:
Independent Threads:threads 之前完全没有状态共享,程序执行过程是确定的,结果是可复现的,scheduling 的顺序无所谓
Cooperating Threads:threads 之间存在状态共享,程序执行过程不确定,执行结果不可复现,这种情况下出现的 bug 称为 Heisenbugs
但仔细思考,实际上不存在 independent threads,每个 process 都共享文件系统、操作系统资源、网络、以及底下的硬件、电路板,一个 device driver 可能导致 thread A 把 thread B 搞崩溃。本节的核心问题其实就是如何能够正确利用 threads 的并发。
Real Problems
ATM Bank Server
ATM Bank Server 负责处理所有 ATM 的存钱取钱请求,如下图所示:

假设我们需要为 ATM 实现一个请求处理函数:
BankServer() {
while (TRUE) {
ReceiveRequest(&top, &acctId, &amount);
ProcessRequest(op, acctId, amount);
}
}
ProcessRequest(op, acctId, amount) {
if (op == deposit) Deposit(acctId, amount);
else if //...
}
Deposit(acctId, amount) {
acct = GetAccount(acctId);
acct->balance += amount;
StoreAccount(acct);
}
Event Driven Version of ATM Server
假设我们只有一个 CPU,或者我们始终只有一个 thread 在处理请求,这样正确性就不是问题:
BankServer() {
while (TRUE) {
event = WaitForNextEvent();
if (event == ATMRequest)
StartOnRequest();
else if (event == AcctAvail)
ContinueRequest();
else if (event == AcctStored)
FinishRequest();
}
}
尽管解决了正确性,但:
万一遇到 blocking I/O,所有后续请求都在等待
为了取得更高的 CPU 利用率,我们需要把业务代码拆得足够细,事件拆得足够细,将 I/O 和 computation 尽可能地 overlap 起来,最终达到提高 cpu 利用率的目的。这种编程形式在图形化交互编程领域使用得较多
Cooperating Threads Version of ATM Server
为了保证在 threads 并发下 ATM Server 的正确性,我们希望 Deposit 函数能够具备原子性 --- 要么函数执行完毕,要么未执行,而不能中途被打断,否则可能出现以下的情形:

Therac-25
一个放射形治疗仪器,因为 threads cooperation 的问题造成软件没能准确控制放射剂量,导致多为病人死亡。

Problem is at the lowest level
在下图所示的两个 threads 中,由于可能的不同执行顺序会导致不同的 x 取值,如 3, 5 等

那么如下图所示的两个 threads 中,x 的取值可能是多少?

除了 1、2,是否还有别的可能?
如果是在 serial processors 中执行,thread A 写入 0001,B 写入 0010,调度的顺序是 ABABABBA,每次写入一个 byte 就切换上下文,就有可能得到 3。
因此我们在寻找并发问题的解决方案时,需要了解底层支持的 operations 中,哪些是 indivisible,即 atomic operations。
Atomic Operations:an operation that always runs to completion or not at all
atomic operations 是 threads cooperation 的基石。那么哪些 operations 是 atomic?
memory references and assignments
load and stores of words:因此在这种情况下,上一个例子中 x = 3 是不可能的
而有些指令是非 atomic 的,如:
double-precision floating point store
VAX and IBM 360 had an instruction to copy a whole array
Too Much Milk
假设你和你的室友共享一个冰箱,冰箱里存放着牛奶,你和你的室友发现冰箱没有牛奶时都会去超时购买牛奶放入冰箱,这时候就可能出现如下事件:

这个问题的解决方案应当实现:
永远不要有多于 1 个人购买牛奶
当没有牛奶时,有人去购买牛奶
方案一:使用便利贴 ❌
if (noMilk) {
if (noNote) {
leave Note;
buy milk;
reomve note;
}
}
对于人,这没问题;但如果是计算机,仍然有可能出现牛奶过多的情况,两个 threads 都发现没有 milk 和 note 以后,就会都去购买牛奶。
方案二:先留便利贴,再检查牛奶 ❌
leave Note;
if (noMilk) {
if (noNote) {
leave Note;
buy milk;
}
}
remove Note
对于人,这没问题;但如果是计算机,则两个 threads 都不会购买牛奶
方案三:每个人使用自己特有的便利贴 ❌

尽管很罕见,但仍然有可能出现两个 threads 都不会购买牛奶的情况。
方案四:每个人使用自己特有的便利贴2 ✅

仔细推理,考虑各种可能,可以发现这种方案竟然是正确的,但方案有很明显的缺点:
即使在解决这种简单的问题都显得很复杂
Thread A 和 Thread B 的代码不对称,很难推广到更多个 threads 合作的场景
当 A 在等待 B 时,会出现 busy-waiting,跟 B 争抢 CPU 时间
方案五:使用锁 ✅
假设计算机硬件为我们提供了比 load/store 更高抽象级别的 primitives --- lock,它支持两种操作:
Lock.Acquire() - 等待 lock 被释放,就立即获取
Lock.Release() - 释放锁,唤醒其它正在等待的 thread
lock 保证在同一时间只有一个 thread 可以成功的获取锁,因此 milk problem 的解法就变成:
milklock.Acquire();
if (nomilk)
buy milk;
milklock.Release();
Acquire() 与 Release() 之间的部分称为 "Critical Section"。
Implement Higher Level Synchronization Primitives: Lock

实际上,计算机的硬件层为我们提供了许多 atomic operations:
Load/Store
Disable/Enable Interrupts
Test&Set
Comp&Swap
我们需要用它们来实现 higher-level api,供用户程序使用,使得编写 threads cooperation 代码更加方便。
How to implement Locks
Interrupts
如果直接使用 load/store,我们就来到 milk 问题的方案四,这种方式复杂且容易出错。是否可以使用别的硬件层面的 atomic operations,比如 Interrupts?
最直接的方法就是:
LockAcquire() {
disable Ints;
}
LockRelease() {
enable Ints;
}
问题在于:
不可以让 user 控制 interrupts
critical sections 的执行时间不可控,因此可能导致系统长时间被困在一个 thread 中
改进方法如下:使用一个 lock 变量,使得该变量同时只能被一个激活
int value = FREE;
Acquire() {
disable interrupts;
if (value == BUSY) {
put thread on wait queue;
go to sleep();
} else {
value = BUSY;
}
enable interrupts;
}
Release() {
disable interrupts;
if (anyone on wait queue) {
take thread off wait queue;
place on ready queue;
} else {
value = FREE;
}
enable interrupts;
}
与之前的解法不同,我们将加锁操作的 critical section 缩短至最小,这些操作必须通过 kernel 来实现,用户不会直接控制 interrupts,同时用户进程在 critical section 中的执行时间可以任意长。

但从上述解法中,我们还需要在 Acquire 失败的过程中,enable interrupts,否则其他进程将无法调用 Acquire 或 Release,如下图所示:

在图中指定的三个位置插入 enable interrupts 都无法解决问题:
位置1:还没把 thread 放到 wait queue 上,导致 wakeUp 信号没有被接收到
位置2:thread 还没有 go to sleep 就可能被 wakeUp
位置3:thread 已经 sleep,就程序无法执行到这个位置就 sleep 了
正解:将 enable interrupts 的责任丢给下一个被调度的线程:

利用 interrupts 来实现 lock 的缺陷在于:
无法在 user-level 完成 Acquire 和 Release
在 multiprocessor 环境下,需要对多个 processor 执行 disable interrupts,中间增加了额外的通信成本
Atomic Read-Modify-Write Instructions
顾名思义,若不论在单核还是多核环境下,硬件层面能提供 atomic read-modify-write 指令序列,就可以利用它们构建 higher-level api,如:
test&set (&address) { /* most architecture */
result = M[address];
M[address] = 1;
return result;
}
swap (&address, register) { /* x86 */
tmp = M[address];
M[address] = register;
register = tmp;
}
compare&swap (&address, reg1, reg2) { /* 68000 */
if (reg1 == M[address]) {
M[address] = reg2;
return access;
} else {
return failure;
}
}
load-linked&store conditional(&address) {
/* R40000, alpha */
loop:
l1 r1, M[address];
movi r2, 1;
sc r2, M[address];
beqz r2, loop;
}
Implementing Locks With test&set
int value = 0;
Acquire() {
while (test&set(value)); // busy waiting
}
Release() {
value = 0;
}
Pros
不需要使用 interrupts,interrupts 的正常功能不受影响
用户代码可以使用该 lock,无需进入 kernel mode
在多核机器上也能工作
Cons
busy-waiting, inefficient
waiting thread may take cycles away from thread
priority inversion: if busy-waiting thread has higher priority than thread holding lock => no progress
改进版:
int guard = 0;
int value = FREE;
Acquire() {
// Short busy-wait time
while (test&set(guard));
if (value == BUSY) {
put thread on wait queue;
go to sleep() & guard=0;
} else {
value = BUSY;
guard = 0;
}
}
Release() {
// Short busy-wait time
while (test&set(guard));
if anyone on wait queue {
take thread off wait queue
place on ready queue
} else {
value = FREE;
}
guard = 0;
}
参考
Last updated