Consistency & Consensus

在分布式系统中,任何地方都可以发生故障。解决这些故障的最简单的方法就是让系统崩溃,同时告诉用户系统有问题。如果你想要更友好的解决方案,那么就需要一种能够容错的系统,即能够在部分组件故障的情况下继续工作。

在此回顾一下分布式系统中可能出现的问题:

  • 数据包丢失、乱序、重复、任意延迟

  • time-of-day 时钟可能不准确

  • 节点可能在任意时刻发生暂停甚至崩溃

搭建一个具备容错能力的系统,最有效的方式是设计并实现一层通用的抽象层,隐藏分布式系统潜在的各种问题,提供给应用层使用。就如 TCP 之于 Web 服务,事务之于数据库服务。对于分布式系统来说,最重要的抽象之一就是共识 (consensus),即让所有节点都认同某个事实。在本节后文中,我们会逐渐体会到在不可靠的环境下可靠地达成共识是一件十分困难而复杂的问题。一旦实现了共识,应用就可以利用这层抽象来解决很多问题:如在单领导复制数据库场景中,leader 节点发生崩溃,剩余节点利用共识选举出新的 leader。

在讨论共识问题算法之前,我们需要了解算法的能力上限和下限,在哪些情况下,共识算法能够帮助容错,在哪些情况下不能。

Consistency Guarantees

在复制数据库中,如果你同时访问数据库的两个节点,很可能获得不同的数据,因为写请求到达不同节点的时间不同,不论你使用哪种复制形式(单领导、多领导、无领导),这种数据不一致都必然存在。

大多数复制数据库至少提供最终一致性(eventual consistency)保证,即如果你停止向数据库写入数据并等待一段足够长的时间,最终数据库的不同节点的数据将达成一致。然而,最终一致性是一种非常弱的保证,它并没有规定复制节点达成一致的时限,因此在这种保证下,如果你写入数据之后立马读取它,系统并不能保证你能读到刚刚写入的数据,因为你的读请求可能被路由到与写请求不同的节点。此外,对于应用开发者来说,最终一致性通常更难理解和适应,因为它们更习惯于构建单线程程序,写入的变量应该立即能读到相应值。除了写代码的过程,最终一致性也会给调试代码的过程带来额外的麻烦,因为数据不一致出现的 bug 通常不明显,难以查找、复现,且过程不确定。

在本节,我们将关心更强的数据一致性保证,包含以下话题:

  • 可线性化数据一致性(linearizability)

  • 分布式系统的事件顺序

  • 分布式事务与共识

Linearizability

在使用满足最终一致性的数据库时,如果你向两个复制节点查询相同的数据,可能得到不同的答案,这有时候会令用户感到疑惑。如果数据库能做到从不同节点查询得到的是相同的数据,那么每个客户端就无需关注复制延迟(replication lag)等问题,像使用单机数据库一样使用分布式数据库。

在满足可线性化一致性的数据库中,一旦 client A 成功完成一次写操作,所有其它 clients 都能够看到刚刚写入的数据。要维持这种 “数据只有一份” 的假象,就需要保证数据必须从持有最新版本数据的节点中获取,因此可线性化一致性是一个近因保证 (recency guarantee),举例如下:

在上图中,Bob 在 Alice 之后刷新网站查询比赛结果,却得到与 Alice 不同的结果,这说明该数据库系统不满足可线性化一致性的要求。

What Makes a System Linearizable

上文中提到,可线性化一致性的核心要求是:让系统看起来像只有一份数据。那么,如何才能做到这点?看下面一个例子:

A、B、C 都在访问一个 k/v 数据库,它们访问的方式和时间先后顺序如图所示,这里需要注意的是,我们完全不考虑 k/v 数据库在背后如何运作,而只是以客户端的视角来看待数据库反映给客户端的样子。从图中我们可以从中看出:

  • A 第一次读取 x 发生在 C 写入 x = 1 之前,因此 A 读取到的一定是 0

  • A 最后一次读取 x 发生在 C 写入 x = 1 之后,因此 A 读取到的一定是 1

  • A 的第二次以及 B 的两次读取都与 C 写入 x = 1 的操作在时间上有重叠,因此它们得到的结果既有可能是 0 也有可能是 1

这样的 k/v 数据库满足可线性化一致性吗?我们可以问自己 “这个 k/v 数据库看起来像只有一份数据” 吗?如果一旦读取数据的时间与写入的时间发生重叠,返回的数据就既可能是 0 也可能是 1,我们就可能观察到数据在摇摆,似乎背后有两份或多份数据,每次读取的只是其中一个副本。如果我们观察到的是,在某个时间点之前返回 0,在某个时间点之后返回 1,而不出现摇摆的现象,我们就有理由认为该系统满足可线性化一致性:

上图中,A 的第二次读操作得到 x = 1,而 B 的第二次读操作发生在 A 的第二次读操作之后,因此 B 读出来的 x 必然等于 1。因此,即使 C 的写操作还没彻底完成,只要有一次读操作返回的 x = 1,所有在那之后发生的读操作必定返回 x = 1。

考虑一个更复杂的例子,我们引入一个新的操作 cas(x,vold,vnew)rcas(x, v_{old}, v_{new}) \Rightarrow r ,compare-and-set 原子操作,如果 x 的当前值为 voldv_{old} ,它将被原子地设为 vnewv_{new} ,否则不做处理。例子如下图所示:

在图中,每个操作上都有一个竖线标记,它标记着操作在数据库中完成的时间点,这些竖线在时间轴上严格按顺序排列,永远不会返回,这就保证了所有写操作都会看到上一次写操作的结果,我们来分析几个有趣的时间点:

  • 一开始,B 发送读 x 的请求,但请求可能在网络中被延迟。在 B 发送消息之后, D 发送写入 x = 0 的写请求,且很快被数据库执行,然后 A 发送写入 x = 1 的写请求,同样在 D 的请求执行之后也被执行了。终于 B 的请求最终被执行,读取得到 x = 1。尽管 A、B、D 的请求的执行顺序与发起顺序不同,但请求最终被执行的顺序可以接受,因为这三个请求可以被认为是并发请求,由于网络延迟或其它原因导致发起和执行的顺序不同而已。

  • 尽管 B 的第一次读请求在 A 的第一次写请求之前得到响应,但得到响应的顺序同样不代表数据库执行请求的顺序,因为这两个响应也可以认为是并发响应,由于网络延迟或其它原因导致执行和响应的顺序不同。

  • 图中的模型并不假设任何事务隔离,在某个客户端执行的过程中,其它客户端可任意改变这些数据。如 C 两次读取 x 的过程中,B 通过 cas 操作修改了 x。

  • B 最后一次读取 x 的操作并不满足可线性化一致性,因为在 B 之前 A 已经读取到了 x = 4,B 的读取在 A 的读取操作执行之后才被执行,因此 B 应当得到 x = 4 的响应。

以上就是可线性化一致性背后的直觉:使得系统看起来只有一份数据,系统状态的历史可以被线性地展示,没有分支。可线性化一致性的正式定义比它更加精确。我们甚至可以通过记录一系列请求和响应的时间点检查这些请求是否能够按照一种合法的顺序排列,来测试系统是否满足可线性化一致性。

问题:Linearizability 与 Serializability 有什么区别

Relying on Linearizability

可线性化一致性可能在哪些场景下被使用?

Locking and leader election

单领导复制系统需要保证整个系统中有且仅有一个 leader,一种实现领导选举的方式是使用分布式锁,如 Apache ZooKeeper 和 etcd。无论使用何种实现,这些锁都必须满足可线性化一致性,即所有节点都同意某个节点拥有了该锁。Apache ZooKeepr 和 etcd 背后都使用了共识算法来实现 fault-tolerant 以及可线性化的操作。

Constraints and uniqueness guarantees

关系型数据库通常提供多种数据完整性约束,其中之一就是唯一性约束。例如,一个用户名或者邮箱地址必须唯一地对应一个用户。作为数据库开发者,如果你想实现这种约束,就需要可线性化一致性来解决两个用户并发地创建相同的用户名或邮箱的场景。类似的场景还包括:

  • 保证银行账户不会为负数

  • 不超卖

Cross-channel timing dependencies

回顾本文开始时 Alice 和 Bob 的例子,如果 Alice 没有宣告她看到的比分,Bob 就不会发现它看到的数据是过期的数据,于是一段时间后 Bob 再次刷新页面就能看到正确的最终比分。这里系统的违背可线性化一致性的行为的发现是因为在系统中多出一条额外的信道,即 Alice 和 Bob 之间的信道。我们可以再举一个例子:

在一个网站中,用户在注册时会上传自己的头像,通常网站需要压缩图片大小、调整图片尺寸来方便展示及其它用户下载,因此会有上图中的架构:

  1. 用户上传图片到 Web Server

  2. Web Server 将图片存储进文件存储系统

  3. 将文件地址及压缩、调整尺寸命令发送到消息队列中

  4. Image Resizer 从消息队列中消费消息,执行相关指令

本例中,Web Server 与 Image Resizer 之间就有两条信道,一条通过文件存储,一条通过消息队列。如果文件存储的过程不满足可线性化一致性,就可能出现 Image Resizer 接到消息执行任务时,发现文件不存在的情况。如果我们能限制信道数量:让 Bob 看不到 Alice 的结果,让 Image Resizer 同时负责存储图片、压缩和调整尺寸,同样能保证可线性化一致性。

Implementing Linearizable Systems

在上文中提到,可线性化一致性意味着在用户眼里只有一份数据,且所有对数据的操作都具备原子性。那么实现可线性化最简单的方式就是:真的只保存一份数据。但这种方案无法容错,一旦服务故障,在服务恢复之前数据就无法被获取,甚至恢复后也可能丢失部分数据。解决容错的常见方式就是复制。在 Replication 一节中,我们介绍了 3 种复制方法,我们可以分别来看看是否可以让它们提供可线性化一致性保证:

一般单领导复制 (有可能支持)

在单领导复制系统中,leader 节点中存有数据的主要版本,而 followers 节点中存有数据的复制版本。如果应用只从 leader 节点中读写数据,或从 leader 节点写数据、从 leader 节点以及同步复制 follower 节点中读数据,那么该系统具备支持可线性化一致性的可能性。但现实中单领导复制节点不一定能满足,原因包括:

  • 使用 snapshot isolation,本身就不符合要求

  • 可能存在并发问题:

    • client 认定的 leader 不一定是 leader

    • 异步复制中,故障转移(failover)可能导致已经提交的数据更新丢失,导致同时违反 Durability 和 Linearizability

共识算法下的单领导复制(支持)

共识算法能够解决一般单领导复制面对的 split brain 和 stale replicas 两大问题(下文会介绍),从而支持可线性化一致性。ZooKeeper 和 etcd 的背后正是有了共识算法的支持。

多领导复制(不支持)

多领导复制系统通常不支持可线性化一致性,因为每个节点都可以接收写请求并复制到其它节点上,就可能出现并发写冲突问题,这种冲突使得系统本身看起来就不像只保存一份数据,更不可能让所有操作都具备原子性。

无领导复制(可能不支持)

一些开发者认为只要符合多数条件,即 w+r>nw + r > n ,无领导复制系统就能做到强一致性,即支持 Linearizability,但实际上是否支持需要探究系统多数决策(quorum)的实际配置。Cassandra 中依赖 time-of-day 时钟的 "Last write wins" 写冲突解决显然并不支持可线性化一致性,因为时钟的绝对数值并不代表事件的真实发生顺序。类似地,Sloppy quorums 同样不支持。即使在严格的多数条件下配置下,系统仍然有可能出现不符合可线性化一致性的表现,下文将举例说明。

Linearizability and quorums

是否只要满足 w+r>nw + r > n ,就能保证系统满足可线性化一致性?

在上图中,某系统有 3 个复制节点,节点中存有数据 x 为 0,而一个 writer 要将 x 修改为 1,将这个写请求发到 3 个复制节点中,即 (n = 3, w = 3),在 writer 写入数据的同时,reader A 和 B 先后访问 x,分别向其中 2 个复制节点发送读请求 (r = 2),其请求和响应如图所示,结果 A 读到 x = 1,B 读到 x = 0。这样的结果与 Alice 和 Bob 遇到的场景如出一辙。我们也有办法解决这种问题,就是强制让 reader 执行同步 read repair,让 writer 在写数据之前读取 quorum 的最新状态。然而实际实践中,Riak 为了避免性能下降,使用了异步 read repair;Cassandra 不仅使用异步 read repair,同时也因为 last-write-wins 的写冲突解决方式而无法支持可线性化一致性。

考虑到上述这些情况,还是假设无领导系统不支持可线性化一致性比较合理和安全。

The Cost of Linearizability

可线性化一致性使得系统具有很强的一致性,但天下没有免费的午餐。以一个多 DC 部署的系统为例:

考虑以下场景:DC1 与 DC2 之间的网络发生中断,但每个 DC 内部的服务运行正常,同时用户可以正常访问各自的 DC。 若使用多领导复制系统,即在不同的 DC 分别部署一个 leader 节点,使得用户能够直接访问距其最近的 DC,缩短应用的响应时间,由于数据在 DC 之间的同步是异步进行,因此各 DC 的数据库都能正常运行,只是数据同步的请求被逐渐积压;若使用单领导复制系统,leader 节点只能处于单个 DC 中,那么所有的写请求以及可序列化的读请求都需要被路由到该节点,但由于 DC 间的网络发生中断,这时服务将变得不可用。

The CAP theorem

事实上,刚才的例子跟系统采用什么样的复制方法并没有关系,任何支持可线性化一致性的分布式系统都可能遇到这种问题:

  • 应用需要满足可线性化一致性,如果不同复制节点之间的网络发生中断,服务将变得不可用

  • 应用无需满足可线性化一致性,如果不同复制节点之间的网络发生中断,服务依然可用,但它的行为没有可线性化一致性作保证

这两个观点就是所谓的 CAP 理论。尽管 CAP 分别由 Consistency、Availability 和 Partition Tolerance 构成,但 P 是无法避免的,分布式系统只能够在 C 与 A 之间作取舍,所以 CAP 的名字更应该叫 either Consistent or Available when Partitioned。

尽管 CAP 被世人所熟知,甚至成为面试的常问题目,但实际上它的定义非常狭隘:只考虑一个一致性模型(可线性化一致性)和一种故障模型(网络分区)。其它一致性模型和故障模型并不在讨论范围内。

Linearizability and Network Delays

尽管可线性化一致性对开发者来说是一种很友好的保证,但几乎没有系统会在实践中提供这样的保证。即使处于多核 CPU 环境下的内存也不支持可线性化一致性,因为每个核都有自己的 cache,所有写操作都会先写入 cache,然后被异步地同步到内存中,因此核 A 将数据写入内存中某地址后,核 B 并不能保证看到。多核 CPU 的内存无需容错,因此它不提供可线性化一致性的主要原因还是在性能方面的考虑。在分布式系统中,即便需要支持容错,许多操作系统不提供可线性化一致性的原因同样还是性能。Attiya and Welch 证明了如果分布式系统要提供可线性化一致性保证,该系统的响应时间将至少与网络延迟的不确定性(the uncertainty of delays in the network)成正比。

世上并不存在更快的一致性算法,只存在更弱的一致性模型。后者可以有效降低系统的响应时间。

Ordering Guarantees

在上文中,我们介绍到:可线性化一致性让系统看起来像只有一份数据,且每个写操作都是原子操作。这个定义意味着写操作将被按照一种确定的顺序执行。顺序的概念在本书中不同话题中已经出现多次:

  • Replication:单领导复制的 leader 的存在实际上是为了保证只有一种写操作的执行顺序存在,否则就会出现并发写冲突问题

  • Serializability:可序列化的事务隔离保证系统中的事务执行结果看起来和按照某种顺序依次执行的结果相同

  • Timestamps & Clocks:系统时钟同样也是为了引入某种顺序和历史的概念

似乎在顺序(Ordering)、可线性化(Linearizability)和共识(Consensus)之间有某种内在的联系。本节我们将讨论顺序

Ordering and Causality

顺序很重要的原因在于它保留了数据间的因果关系(Causality)。因果关系暗含了事件发生的顺序,如消息被接收之前必然有消息被发送,系统中的事件通过因果关系关联起来就形成了系统的因果顺序(causal order)。如果系统在任何时刻都遵循因果顺序,我们就称系统为因果一致(causally consitent)。如在 snapshot isolation 中,当你读取了数据库中的某些数据,那么导致这些数据产生的数据应当也能够被读取。

The causal order is not a total order

全序(total order)使得集合中的任意两个元素具备可比性,如自然数集合。但并非所有集合中的元素都满足全序关系,假如我们定义集合(sets)之间的比较函数 f(a,b)=sizeof(a)>sizeof(b)f(a, b) = sizeof(a) > sizeof(b) ,那么含有相同数量元素的集合之间就无法比较,如 {a,b}\{a, b\}{b,c}\{b, c\} 。数学上称这种集合满足偏序(partial order)。在数据一致性模型中,同样可以看到全序与偏序:

  • 可线性化一致性:在可线性化系统中,所有的操作之间拥有绝对的先后顺序,即为全序。任意两个操作事件的发生先后顺序都可以确定,且每个操作都是原子操作。

  • 因果一致性:在因果一致性系统中,没有因果关系的两个事件之间可以认为是并发的,存在因果关系的事件之间才有先后顺序。因果关系为系统事件定义了一个偏序。

在可线化性的数据库中,所有操作一定能被顺序地排列在一条时间线上,即使系统可能同时接到多个请求等待处理,但数据库保证了每个请求都会在一个确定的时间点上被原子地在一份数据上执行,在执行过程中没有任何并发存在。

在因果一致性数据库中,系统的时间线会出现分支与合并的现象,属于不同分支上的操作之间不存在先后顺序,是并发的。想象使用 Git 合作开发时项目的 commit history:每个分支的 commits 之间有绝对的顺序,但不同分支的 commits 之间则无法比较先后顺序,因此在合并时就需要解决写冲突问题。

Linearizability is stronger than causal consistency

可线性化一致性是因果一致性的充分条件,即只要系统满足可线性化一致性,那么它一定满足因果一致性。但在 "The Cost of Linearizability" 一节中,我们看到使得一个系统可线性化将牺牲它的性能和可用性,在系统面临严重的网络延迟时情况尤为严重,因此许多分布式数据库在设计上都放弃了可线性化一致性模型。但仔细思考可以想像:应用程序真正需要的一般只是因果一致性。好消息是,可线性化一致性只是满足因果一致性的一种方式,存在其它既满足因果一致性,又能在面对网络延迟时保持性能且系统可用的解决方案。而这也是新兴数据库系统前进的方向。

Capturing causal dependencies

要确定因果依赖关系,我们需要一种方式来表示系统中单个节点的知识(Knowledge)。如果一个节点在写 Y 之前,已经看到过 X,那么 X 和 Y 就具备因果关联关系。而为了维持因果关系,每个节点在执行一个操作之前,必须保证该操作的因果前向操作执行完毕,如果某个前向操作尚未执行,则必须等待它执行完毕后才能继续执行;或者必须保证该操作在生效之前,所依赖的数据没有发生变化。Replication 一章中的 "Detecting Concurrent Writes" 一节介绍的 version vectors 就是在保证前者;而 Transactions 一节中的 "Serializable Snapshot Isolation" 一节介绍的事务提交前的数据版本检查就属于保证后者。在这两种做法中,数据库都需要额外存储不同版本的数据,来保证因果关系信息不丢失。

Sequence Number Ordering

尽管因果顺序不是全序,但要在实践中跟踪发生在系统中的所有因果关系并非易事。在许多应用程序中,可能在写入数据前读取大量数据,这时候对于数据库系统来说新写入的数据究竟依赖哪些被读取的数据难以确定,且显式地跟踪、记录所有的依赖信息将给系统带来较大成本。

比起全量记录因果顺序,实践中更可取的方式是使用逻辑时钟(logical clock),这种时钟通常由一个自增序列(sequence number)来表示,每启动一个新的操作,则为该操作赋予一个数字,二者一一对应。这种逻辑时钟提供了系统中事件的全序,通过比较操作附带的逻辑时间戳就可以知道操作发生的先后顺序。我们只要保证如果操作 A 在 B 之前发生,那么 A 的逻辑时间戳小于 B 的逻辑时间戳,就能够保证系统的因果顺序同时被记录。

在单领导复制的数据库中,复制日志(replication log)确定了写操作的全序和因果关系。只有 leader 节点可以修改逻辑时钟,为写操作赋予逻辑时间戳。只要 follower 节点就按照复制日志中记录的顺序来执行操作,那么 follower 与 leader 就能保证因果一致:即在 leader 上存在因果关系的两个写操作在 follower 上也必存在因果关系。

Noncausal sequence number generators

如果不是单领导复制系统,则很难产生类似的递增序列。在实践中有很多替代方案,如:

  • 每个节点产生独立的递增序列:

    • 方法1:假设有 n 个节点,那么第 k 个节点产生序列 n×i+k,iNn \times i + k, i \in N

    • 方法2:将每个数字的前 m 个 bits 保留为节点 id

    • 方法3:为每个节点预分配序列号,如节点 A 分配 1 - 1000,节点 B 分配 1001 - 2000

  • 在每个操作后面加上 time-of-day 时钟产生的时间戳,然后利用 last-write-win 策略解决冲突

尽管以上方案在性能、可扩展性上都要更好,然而它们产生的序列并不能保证系统的因果一致性。那么在非单领导复制系统中,是否存在一种方案能够产生保持系统因果一致性的递增序列?

Lamport timestamps

在 1978 年,Leslie Lamport,LaTex 和 paxos 的作者,提出了一种时间戳,能够使得非单领导复制系统保持因果一致性,这种时间戳也被称为 Lamport 时间戳。

如下图所示:

Lamport 时间戳由每个节点的 ID 以及单调递增的计数器值共同构成,用符号可以表示为 (counter, node)。比较任意两个 Lamport 时间戳时,先比较 counter 的大小,若相同再比较 nodeID,Lambda 时间戳为系统提供了全序 (详见 Time, Clocks, and the Ordering of Events in a Distributed System (1978) )。Lamport 时间戳同样是一个逻辑时钟,与 time-of-day 时钟没有关系。而保证因果一致的核心逻辑在于,系统中的每个节点和每个客户端都会保持各自目前所见的最大的 counter 值,当任意节点或客户端看到比自身 counter 更大的值时,会立刻将本地的 counter 更新到该值。比如图中的 Client A,在第二次请求时看到了更大的 counter = 5,就立即更新本地 counter,而当它第三次写数据时,Node 1 也看到了这个值,于是也更新了本地的 counter。只要最大的计数器能跟随每个写操作传递下去,就能够保证系统满足因果一致性。为什么?考虑某个 Client X 的任意两个连续请求 R1 和 R2,它们的 Lamport 时间戳分别为 (counter1, node1)、(counter2, node2),因为是 X 先后发出的请求,counter2 > counter1,因此 R2 的时间戳大于 R1 的时间戳。

Timestamp ordering is not sufficient

尽管 Lamport 时间戳定义了系统中写操作的全序,且满足因果一致性,但它仍然无法有效解决一些分布式系统的常见问题。

考虑场景:两个用户同时想要创建相同名字的账户,且系统不允许重名。那么必然有一个操作成功,另一个失败。你的第一反应可能会觉得 Lamport 时间戳可以解决这个问题,因为每个写操作有绝对的先后顺序,但这前提是每个节点在某时刻都能有全部的信息,因此在每个节点获取所有信息之前,它无法确定写操作应该成功还是失败,甚至如果其它某个节点遇到了网络分区问题,将导致这个决定延迟更长的时间。这样的响应流程和时间对面对用户的系统来说是无法接受的。

这个场景的本质问题在于所有事件(写操作)的全序只有在你获取每个节点上的相关信息后才能确定。如何将事件全序传播到每个节点,就是 total order broadcast 话题所讨论的内容。

Total Order Broadcast

通过前面的讨论可以看出:单领导复制系统通过将所有写操作集中到 leader 节点的单个 CPU 核上,以此获得的系统全序要比非单领导复制系统通过类似 Lamport 时间戳获得的全序要更加强大。前者是全序传播为单向模式,从 leader 节点出发;后者的传播不定向,从任意节点出发,到达任意节点。前者的挑战在于:

  • 如果事件的吞吐量超过单个节点的能力范围如何扩展

  • 如果 leader 节点发生故障,如何恢复系统正常运行

在分布式系统文献中,这个问题被称作 total order broadcast 或者 atomic broadcast。

解决 total order boradcast 问题的方案通常以节点之间的通信协议来定义,这些协议应该满足两点:

  • Reliable Delivery:消息不丢失,且若一个消息被传递到一个节点,该消息就必须被传递到所有节点

  • Totally Ordered Delivery:消息被按照相同的顺序传递到不同的节点

才能保证它的正确性。当然,在网络中断、波动的情况下,消息将无法被传递到部分节点,但算法需要通过不断重试来保证当网络恢复时,消息能被正确传递,保证最终消息可靠到达且顺序一致。

Using Total Order Broadcast

一些共识 (Consensus) 服务如 ZooKeeper 和 Etcd 内部实现了 total order broadcast。实际上 consensus 与 total order broadcast 之间存在着很强的联系。

total order broadcast 可以被用于实现复制系统:如果每个复制节点都按照相同的顺序执行写操作,那么所有复制节点就能保持最终一致,这也被称为 state machine replication。

total order broadcast 可以被用于实现可序列化事务:如果每个事务在每个节点上按确定的顺序执行,不同事务的执行自然可以被序列化。

Implementing linearizable storage using total order broadcast

(ignored)

Implementing total order broadcast using linearizable storage

(ignored)

Distributed Transactions and Consensus

共识问题是分布式计算的最重要、最基础的问题,它的目的就是让多个节点对某个事件达成共识。我们已经有了足够的基础知识:

  • Replication

  • Transactions

  • System Models

  • Linearizability

  • Total Order Broadcast

本节就来尝试解决这个问题。

在实践中有许多使用场景需要让多个节点达成共识,如:

  • Leader Election:单领导复制系统中,所有节点需要都认同某个节点为 leader,当发生节点崩溃或者网络中断时,系统需要能够继续正确运行,不能出现两个 leader 共存的情况。

  • Atomic Commit:在拥有多个节点的数据库上执行事务,可能会出现部分节点操作成功、部分节点操作失败的情况,如果想要保证分布式事务的原子性,就需要让所有节点对事务是否执行达成共识,要么都 commit,要么都 abort/roll back。

本节我们先讨论 atomic commit 问题。以 two-phase commit (2PC) 算法为起点,我们可以进一步探讨更好的共识算法,如 ZooKeeper 中的 Zab 和 Etcd 中的 Raft。

Atomic Commit and Two-Phase Commit (2PC)

在 Transaction 一节中,我们学习了单个节点内部如何保证事务的原子性。进入到分布式多节点环境下,除了保证单个节点上事务的原子性,还需要保证一个分布式事务下所有单节点事务要么同时 commit,要么同时 abort。

2PC & 3PC

参考 2PC & 3PC

Distributed Transactions in Practice

在讨论实践中的分布式事务前,我们需要明确分布式事务的定义。通常分布式事务分为两种:

  • 数据库内部分布式事务 (Database-internal distributed transactions):即在单个数据库的多个节点中实现的事务,如 VoltDB 和 MySQL Cluster's NDB storage engine。在这种情况下,参与事务的每个节点都运行着相同的软件。

  • 异构分布式事务 (Heterogeneous distributed transactions):即在两种或多种不同系统中实现的事务,如不同厂商开发数据库、消息队列等等。

数据库内部分布式事务无需与其它系统保持兼容,可以使用任意协议或优化技术,因此相对来说比较好实现。但异构分布式事务的实现就十分有挑战性。

Exactly-once message processing

要保证消息队列中的每条消息恰好被消费一次,应用需要将消息的确认提交 (message acknowledgment commit)与数据库写放入一个事务中,要么都被 commit、要么都被 abort。

XA transactions

X/Open XA (eXtended Architecture) 是一种实现在异构技术上实现 2PC 的标准。它在 1991 年被提出,并广泛地支持,包括许多传统关系型数据库,如 PostgreSQL、MySQL、DB2、SQL Server 和 Oracle;以及消息中间件,如 ActiveMQ、HornetQ、MSMQ 以及 IBM MQ 等。

XA 并不是一个网络协议,它只是一组供事务 coordinator 调用的 API。应用通过 XA API 访问数据库和消息服务,只要后者支持 XA 协议,应用就能够发送 prepare、commit 以及 abort 指令,实现 2PC 过程。一旦应用崩溃或者宿主机宕机,coordinator 也就随之而去,哪些以及 prepare 完毕但尚未 commit 的 voter 就陷入了存疑 (in doubt) 状态,只能等待 coordinator 恢复后才能继续 2PC 过程,因为 2PC 要求 voter 在 prepare 回复 yes 之后就视为放弃自作主张的权力。

Holding locks while in doubt

在 2PC 过程中,最令人放不下的莫过于 voter 处于 in doubt 状态,总感觉这对系统来说是不详的征兆。事实也的确如此,in doubt 的问题就在于 voter 上的该事务还手握许多数据的锁,如修改数据时需要获取行级别的互斥锁,防止脏写;在利用 2PL (Two-Phase Locking) 实现可序列化隔离时,读取数据时需要获取行级别的共享锁。而这些锁在 coordinator 恢复之前不能释放。如果 coordinator 在崩溃后 20 分钟才恢复,那么这些锁也将被持有 20 分钟。在此期间,没有其它事务可以修改这些行,大量的事务将被阻塞,甚至导致应用不可用。更糟糕的是,如果 coordinator 恢复后发现预写的日志丢失,则这些锁将被永久地持有,或者至少需要管理员手动干预。

Limitations of distributed transactions

XA 事务虽然利用 2PC 实现了异构分布式事务,但同时也引入了许多运维问题。认清这点的关键在于 coordinator 本身在某种意义上就已经是一个数据库,它保存着 2PC 的状态,因此数据库需要面对的问题 coordinator 也需要面对:

  • 如果 coordinator 是单机服务,没有复制节点,它将成为系统的单点故障。令人吃惊的是,许多 coordinator 的实现默认并未做高可用支持

  • 许多应用都被开发成无状态的形式,将系统状态存入数据库中,使得应用可以支持横向扩展。然而如果 coordinator 也是应用的一部分,coordinator 的日志就必须被持久化,这样应用就不再无状态

  • 由于 XA 需要和各种各样的数据系统兼容,它很难去检测死锁、很难实现 SSI (Serializable Snapshot Isolation),前者要求系统之间有标准的协议来交换锁信息,后者要求系统之间有标准的协议来发现系统之间的冲突关系

  • 对于数据库内部分布式事务来说,限制少一些,如实现 SSI 是可能的。然而 2PC 要推进,必须要所有的参与者都响应,因此系统中的任意参与者出现故障,事务就会失败。因此分布式事务在某种意义上放大了故障影响,这与我们想要利用分布式来增加系统容错能力的目的背道而驰。

这些因素是否意味着我们需要放弃保持多个系统数据一致的希望?并非如此,在往后的章节中我们将讨论相关的替代方案。

Fault-Tolerant Consensus

一个具备容错能力的共识算法必须满足以下特性:

  • Uniform Agreement:所有节点做出的决定必须相同

  • Integrity:每个节点只能做一次决定,不能反悔

  • Validity:如果一个决定的决定是 v,v 必须是其它某个节点提出的

  • Termination:每个没有崩溃的节点最终都会做出某个决定

Uniform Agreement 和 Integrity 确定了共识的核心观点:每个节点都做出相同的决定,一旦做出决定不能反悔;Validity 保证每个节点所做的决定不是随意、盲目的决定,必须是合理的决定。如果不考虑容错能力,那么要满足前三点很容易,直接指定其中一个节点为独裁者,由它来做所有决定,其它节点服从即可,考虑容错,即独裁者节点可能发生崩溃的情况,这时候系统就无法继续做出决定。2PC 不具备容错能力就在于如果 coordinator 故障,系统就会停滞不前,处于 in doubt 的节点无法擅自决定 abort/commit。Termination 要求共识算法不能停滞不前,它必须在系统部分节点故障后仍然能够继续达成共识。尽管 Termination 要求共识算法不能停滞不前,但如果所有节点都发生故障无法继续运行,再神奇的共识算法也无能为力。因此具备容错能力的共识算法所能容忍的故障量有限。研究者已经证明,任何共识算法的正常运行至少需要半数以上的节正常运行。

Consensus algorithms and total order broadcast

当前最受关注的具备容错能力的共识算法包括 VSR (Viewstamped Replication)、Paxos、Raft 以及 Zab。这些算法之间有许多相似之处,本章不会进入这些算法的实现细节,但如果不是需要实现共识算法本身,了解一些 high-level 的直觉就足够。

大多数容错共识算法通常会对一系列数值做决定,即连续地达成共识,在达成共识的过程中,所有共识合起来形成了一个全序,因此连续达成共识的过程就是 total order broadcast 的过程,这便是 consensus 与后者之间的内在联系。

  • Uniform Agreement 保证所有节点按相同顺序传递相同的消息

  • Integrity 保证消息不重复

  • Validity 保证消息不会被破坏,也不会被捏造

  • Termination 保证消息不丢失

VSR、Raft、Zab 以及 Multi-Paxos 都直接实现了 total order broadcast。

Single-leader replication and consensus

Replication 一节中,我们讨论了单领导复制系统,所有的写请求通过 leader 按相同的顺序传递给 followers,保证所有复制节点的数据一致,这不就是 total order broadcast 吗?为什么我们在该章节中完全不需要考虑共识问题?

因为在单领导复制系统中,leader 节点通常由管理员直接配置指定,是一个独裁的共识。如果 leader 节点宕机,在管理员手动配置一个新的 leader 节点前系统将不可用。这样的单领导复制系统在实践中虽然被使用,但它并不满足容错共识算法的 Termination 要求,即在 leader 节点故障时需要人为干扰保证系统继续运行。如果系统能够自动完成领导选举 (leader election)、故障转移,实际上就接近了容错共识算法,但还不够。如果系统中有两个节点都认为自己是 leader 节点,就有可能出现 split brain 问题,因此系统还需要对 "领导选举" 这个决定达成共识。

这似乎意味着,想要选出一个领导,我们首先需要一个领导;想要达成共识,我们首先先要解决共识。如何走出这个鸡生蛋、蛋生鸡的迷宫?

Epoch numbering and quorums

所有提到的容错共识算法内部都存在 leader 的概念,但 leader 并非永远是同一个节点。通常每个协议都会有 epoch number 的概念 (即 Paxos 中的 ballot number、VSR 中的 view number 以及 Raft 中的 term number),并保证在每个 epoch 内只有一个唯一的 leader。

每当当前的 leader 失联,剩下的节点就会启动选举过程,并将 epoch number 增加 1,因此 epoch number 单调递增。如果在不同 epoch 中选举出了不同的 leader,那么在较大 epoch 中选举出的 leader 将被认为是真正的 leader。在选举过程中,leader 需要通过请求选票的过程来确定当前没有其它来自更大 epoch 的 leader 在做决定,如果有则 leader 放弃选举。如果 leader 顺利获得了达到法定人数要求的选票,那么它就可以认为自己成为了 leader。

Limitations of consensus

容错共识算法是分布式系统研究上的重大突破,它们在不确定极大的分布式环境中为系统的正确性、容错性保驾护航,它们还支持 total order broadcast,从而使得在分布式环境中实现可序列化原子操作成为可能。即便如此,它们并没有被应用到任何地方,因为这些好处并不是免费的午餐,需要一定的成本。

  • 在决定之前先提出想法以期达成共识的过程是一个同步复制的过程,同步复制势必要影响数据库性能。因此大多数数据库使用异步复制,宁可冒着在故障转移过程中失去已经提交数据的风险,也要获得更好的性能。

  • 共识系统通常要求投票过程需要半数以上节点参与,这就以为着要实现容错,系统至少需要有 3 个节点来容忍 1 个节点故障,或者 5 个节点来容忍 2 个节点故障。如果网络分区将一些节点隔离,那么只有拥有多数节点的分区能够继续运行,而少数节点分区则阻塞。

  • 许多共识算法都假设参与投票的节点是固定的,这意味着你无法动态地增加或删除节点。但这并不绝对,现在已经有 dynamic membership 技术允许用户动态增减节点。

  • 共识系统通常依靠超时机制来检测故障节点,在网络波动较大的环境中,将导致过多的领导选举过程,影响系统性能。

  • 有时候,共识算法对网络问题十分敏感。Raft 就被验证过在极端情况下可能出现 leader 在两个节点之间不断交换,导致系统无法推进。其它共识算法也有类似的问题,因此设计一个对于不稳定的网络环境表现更健壮的共识算法还是一个开放性问题。

Membership and Coordination Services

类似 ZooKeeper 与 etcd 这样的项目通常被描述为 "分布式键值数据库" 或者 "协调与配置服务"。从这些服务的 API 来看,它们就是数据库,但它们与普通数据库有什么区别?ZooKeeper 与 etcd 在设计上就是针对少量能直接放入内存中的数据(尽管它们会利用磁盘做数据持久化),因此将应用的数据放入其中不太合适。通常 ZooKeeper 中保存的数据变化缓慢,可能几分钟甚至几个小时才会变化一次。利用具备容错能力的 total order broadcast 算法,这些数据会被复制到所有节点并保证一致性。

ZooKeeper 被认为是 Google 的 Chubby lock service 的非官方实现,它不仅仅实现了 total order broadcast,同时也支持一些对构建分布式系统十分有帮助的特性:

  • Linearizable atomic operations:利用原子的 compare-and-set 操作,你能够实现一个锁。如果多个节点并发地尝试执行相同的操作,只能有一个成功。ZooKeeper 的共识算法能够保证操作的原子性和线性化。通常分布式锁会被赋予租期(lease),如果客户端故障,已经被获取的锁将在一定期限后失效。

  • Total ordering of operations:Zookeeper 为所有操作赋予单调递增的事务 ID(zxid)以及版本号(cversion),从而保证所有操作的全序。利用操作的全序,Zookeeper 可以防止当遇到程序发生暂停后作出前后矛盾的决定。

  • Failure detection:ZooKeeper 的客户端与之维持一个长连接,client 与 server 之间周期性的交换心跳信息来确保对方存在。即便偶尔连接中断,或者 ZooKeeper 节点故障,二者之间的 session 仍保持活跃。如果间隔时间过长,ZooKeeper 将认为 session 失效,任何该 session 持有的锁将被自动释放,即 ZooKeeper 中的 ephemeral nodes。

  • Change notifications:除了访问锁以及键值信息,ZooKeeper 的客户端还能够监听指定数据的变化。因此客户端能够通过这种机制对关心的变化事件作出反应。通过订阅通知(subscribing to notifications),客户端能够避免通过频繁轮询来实现相同的功能。

在 ZooKeeper 的这些特征中,只有 linearizable atomic operations 需要共识算法的支持,然而正是这些功能的组合使得 ZooKeeper 能够为构建分布式系统提供很大帮助。

Allocating word to nodes

想象以下两个场景:

  • 如果你有服务的多个实例,需要实现 master/slave 或者 leader/follower 关系,且当 master/leader 故障时,其它节点能够被升级为 master/leader。

  • 如果你的资源存在不同的分区,需要决定哪些分区被分配给哪些节点,且在系统运行过程中会有新的节点接入,旧的节点移除,你的资源相应地需要重新平衡分配到这些节点上。

类似这种配置决定,在 ZooKeeper 中可以通过 atomic operations、ephemeral nodes 以及 notifications 特性组合实现,从而使得系统得以在没有人工干预的情况下自动恢复。尽管要做到实现正确并不容易,但这也比实现共识算法来得容易。

一个应用可能从一个节点开始不断扩容,最终成长为上千个节点的大型应用。想要让这么多个节点参与投票是一件十分低效的事情。通常 ZooKeeper 会被部署在固定数量的节点上,因此我们可以将这种达成共识的事情外包给 ZooKeeper。

Service discovery

ZooKeeper、etcd 以及 Consul 经常被用于实现服务发现平台,即通过服务名称找到服务所在的具体 IP 及端口。在云平台、微服务架构下,虚拟机可能随时启动与关闭,你通常无法预先知道你的服务将被部署在哪个 ip、哪个端口上。因此,通常的做法是在服务启动时,将服务的信息注册到服务发现中心,在服务发现中心可以找到其它你可能关心的服务的地址。

然而,服务发现的实现似乎并不一定需要共识。服务发现的祖先 DNS 就是利用多层缓存来保证性能与可用性,但从 DNS 中读取的数据肯定不支持可线性化,但 DNS 数据的延迟并不是什么大问题,更重要的是它在不稳定网络环境下的可靠性和健壮性。

尽管服务发现本身不需要共识,但领导选举需要。如果你的共识系统已经选举出了领导,那么由他来帮助其它服务发现 leader 很有意义。为此,一些共识系统支持只读的缓存复制节点,这些节点异步地接收共识算法做出的所有决定,但并不参与投票过程,它们可以被用于服务那些不需要支持可序列化的读请求。

Membership services

membership service 决定当前哪些节点是该集群的活跃节点。在 The Trouble with Distributed Systems 中说过,在没有限制的网络延迟下,系统无法判断节点是否真的发生了故障。然而,如果你将 failure detection 与 consensus 结合集群中的节点可以对 "哪些节点还活着" 这件事达成共识。尽管无法避免误判,但达成共识对系统依然很有帮助。