Lab: Raft - Log Replication
说在前面
在实验的过程中,一定要反复阅读:
Raft Paper Figure 2
6.824 Course Notes
本节主要包含以下几个部分:
Send AppendEntries RPC
Receive AppendEntries RPC
Safe Replication Checking Daemon
CommitIndex Checking Daemon
Send AppendEntries RPC
HeartBeats & AppendEntries
想明白 HeartBeats 和 AppendEntries 的关系花了我很长时间,其实二者是相同的,AppendEntries 消息同时包含 HeartBeats 和 AppendEntries 两个功能,关于这一点,在 Students Guide To Raft 一文中也有很详细地描述。我在最开始实现时就是构建了两个函数,sendHeartBeats
和sendAppendEntries
,后来这两个函数越写越像,才最终注意到这点。
AppendEntries Sending Daemon
为了代码可读性更高,这里同样适用 Daemon 来发送 AppendEntries RPCs。
{
for i, _ := range rf.peers {
if i != rf.me {
go func(server int) {
rf.sendAppendEntriesMessage(server)
}(i)
}
}
}
在 rf.sendAppendEntriesMessage(server)
内部,循环发送消息。
AppendEntries Sending Loop
最开始我们的实现可能是这样:
func (rf *Raft) sendAppendEntriesMessage(server int) {
for {
if !rf.isLeader() {
time.Sleep(HeartBeatTimeout)
continue
}
// args, reply preparation...
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
if ok {
// peer responds to request
} else {
// timeout
}
time.Sleep(HeartBeatTimeout)
}
}
这样的实现可以帮助通过大部分测试,但当设定的网络状态变成 unreliable 时,即 TestUnreliableAgree2C, TestFigure8Unreliable2C, TestUnreliableChurn2C,就会出现各种问题。原因在于 rf.sendAppendEntriesMessage() 需要完成 HeartBeats 的功能,如果网络延迟很大,该 goroutine 将无法在等待上一个请求结果的时候发送 HeartBeats,导致 Election Timeout 频繁发生,不断选举。
因此,我们需要将请求及其结果处理的代码放进 goroutine 中:
func (rf *Raft) sendAppendEntriesMessage(server int) {
for {
if !rf.isLeader() {
time.Sleep(HeartBeatTimeout)
continue
}
// args, reply preparation...
go func() {
ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
if ok {
// peer responds to request
} else {
// timeout
}
}()
time.Sleep(HeartBeatTimeout)
}
}
Negotiation Strategy
在 Leader 与 Follower 协商正确的 PrevLogEntry 的过程中,Raft 建议使用每次递减 1 的策略(见 Raft p8),但使用该策略无法通过本实验的测试,因此需要实现更快速、高效的策略。
Receive AppendEntries RPC
严格按照 Figure 2 实现即可,没有什么坑。
Safe Replication Checking Daemon
这里也将 Safe Replication Checking 逻辑单独拿到 Daemon 中处理,增加代码的可读性。
CommitIndex Checking Daemon
只有当 commitIndex > lastApplied 的时候执行命令,Leader 执行命令后相当于告诉 Client 命令执行完毕
go func() {
for {
rf.mu.Lock()
if rf.commitIndex > rf.lastApplied {
le := rf.logs[rf.lastApplied]
rf.applyCh<-ApplyMsg{
CommandValid: true,
Command: le.Command,
CommandIndex: le.Index,
}
rf.lastApplied += 1
}
rf.mu.Unlock()
time.Sleep(CheckLastAppliedTimeout)
}
}()
Last updated