Lab: Raft - Log Replication

说在前面

在实验的过程中,一定要反复阅读:

本节主要包含以下几个部分:

  • Send AppendEntries RPC

  • Receive AppendEntries RPC

  • Safe Replication Checking Daemon

  • CommitIndex Checking Daemon

Send AppendEntries RPC

HeartBeats & AppendEntries

想明白 HeartBeats 和 AppendEntries 的关系花了我很长时间,其实二者是相同的,AppendEntries 消息同时包含 HeartBeats 和 AppendEntries 两个功能,关于这一点,在 Students Guide To Raft 一文中也有很详细地描述。我在最开始实现时就是构建了两个函数,sendHeartBeatssendAppendEntries,后来这两个函数越写越像,才最终注意到这点。

AppendEntries Sending Daemon

为了代码可读性更高,这里同样适用 Daemon 来发送 AppendEntries RPCs。

{
    for i, _ := range rf.peers {
		if i != rf.me {
			go func(server int) {
				rf.sendAppendEntriesMessage(server)
			}(i)
		}
	}
}

rf.sendAppendEntriesMessage(server) 内部,循环发送消息。

AppendEntries Sending Loop

最开始我们的实现可能是这样:

func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
        if ok {
            // peer responds to request
        } else {
            // timeout
        }
        
        time.Sleep(HeartBeatTimeout)
    }
}

这样的实现可以帮助通过大部分测试,但当设定的网络状态变成 unreliable 时,即 TestUnreliableAgree2C, TestFigure8Unreliable2C, TestUnreliableChurn2C,就会出现各种问题。原因在于 rf.sendAppendEntriesMessage() 需要完成 HeartBeats 的功能,如果网络延迟很大,该 goroutine 将无法在等待上一个请求结果的时候发送 HeartBeats,导致 Election Timeout 频繁发生,不断选举。

因此,我们需要将请求及其结果处理的代码放进 goroutine 中:

func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        go func() {
            ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
            if ok {
                // peer responds to request
            } else {
                // timeout
            }
        }()    
        time.Sleep(HeartBeatTimeout)
    }
}

Negotiation Strategy

在 Leader 与 Follower 协商正确的 PrevLogEntry 的过程中,Raft 建议使用每次递减 1 的策略(见 Raft p8),但使用该策略无法通过本实验的测试,因此需要实现更快速、高效的策略。

Receive AppendEntries RPC

严格按照 Figure 2 实现即可,没有什么坑。

Safe Replication Checking Daemon

这里也将 Safe Replication Checking 逻辑单独拿到 Daemon 中处理,增加代码的可读性。

CommitIndex Checking Daemon

只有当 commitIndex > lastApplied 的时候执行命令,Leader 执行命令后相当于告诉 Client 命令执行完毕

go func() {
	for {
		rf.mu.Lock()
		if rf.commitIndex > rf.lastApplied {
			le := rf.logs[rf.lastApplied]
			rf.applyCh<-ApplyMsg{
				CommandValid: true,
				Command:      le.Command,
				CommandIndex: le.Index,
			}
			rf.lastApplied += 1
		}
		rf.mu.Unlock()
		time.Sleep(CheckLastAppliedTimeout)
	}
}()

Last updated