Lab: Raft - Log Replication

说在前面

在实验的过程中,一定要反复阅读:

本节主要包含以下几个部分:

  • Send AppendEntries RPC

  • Receive AppendEntries RPC

  • Safe Replication Checking Daemon

  • CommitIndex Checking Daemon

Send AppendEntries RPC

HeartBeats & AppendEntries

想明白 HeartBeats 和 AppendEntries 的关系花了我很长时间,其实二者是相同的,AppendEntries 消息同时包含 HeartBeats 和 AppendEntries 两个功能,关于这一点,在 Students Guide To Raft 一文中也有很详细地描述。我在最开始实现时就是构建了两个函数,sendHeartBeatssendAppendEntries,后来这两个函数越写越像,才最终注意到这点。

AppendEntries Sending Daemon

为了代码可读性更高,这里同样适用 Daemon 来发送 AppendEntries RPCs。

{
for i, _ := range rf.peers {
if i != rf.me {
go func(server int) {
rf.sendAppendEntriesMessage(server)
}(i)
}
}
}

rf.sendAppendEntriesMessage(server) 内部,循环发送消息。

AppendEntries Sending Loop

最开始我们的实现可能是这样:

func (rf *Raft) sendAppendEntriesMessage(server int) {
for {
if !rf.isLeader() {
time.Sleep(HeartBeatTimeout)
continue
}
// args, reply preparation...
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
if ok {
// peer responds to request
} else {
// timeout
}
time.Sleep(HeartBeatTimeout)
}
}

这样的实现可以帮助通过大部分测试,但当设定的网络状态变成 unreliable 时,即 TestUnreliableAgree2C, TestFigure8Unreliable2C, TestUnreliableChurn2C,就会出现各种问题。原因在于 rf.sendAppendEntriesMessage() 需要完成 HeartBeats 的功能,如果网络延迟很大,该 goroutine 将无法在等待上一个请求结果的时候发送 HeartBeats,导致 Election Timeout 频繁发生,不断选举。

因此,我们需要将请求及其结果处理的代码放进 goroutine 中:

func (rf *Raft) sendAppendEntriesMessage(server int) {
for {
if !rf.isLeader() {
time.Sleep(HeartBeatTimeout)
continue
}
// args, reply preparation...
go func() {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
if ok {
// peer responds to request
} else {
// timeout
}
}()
time.Sleep(HeartBeatTimeout)
}
}

Negotiation Strategy

在 Leader 与 Follower 协商正确的 PrevLogEntry 的过程中,Raft 建议使用每次递减 1 的策略(见 Raft p8),但使用该策略无法通过本实验的测试,因此需要实现更快速、高效的策略。

Receive AppendEntries RPC

严格按照 Figure 2 实现即可,没有什么坑。

Safe Replication Checking Daemon

这里也将 Safe Replication Checking 逻辑单独拿到 Daemon 中处理,增加代码的可读性。

CommitIndex Checking Daemon

只有当 commitIndex > lastApplied 的时候执行命令,Leader 执行命令后相当于告诉 Client 命令执行完毕

go func() {
for {
rf.mu.Lock()
if rf.commitIndex > rf.lastApplied {
le := rf.logs[rf.lastApplied]
rf.applyCh<-ApplyMsg{
CommandValid: true,
Command: le.Command,
CommandIndex: le.Index,
}
rf.lastApplied += 1
}
rf.mu.Unlock()
time.Sleep(CheckLastAppliedTimeout)
}
}()