# Lab: Raft - Log Replication

## 说在前面

在实验的过程中，一定要反复阅读：

* Raft Paper Figure 2
* 6.824 Course Notes
* [Students' Guide to Raft](https://thesquareplanet.com/blog/students-guide-to-raft/)
* [Raft Q\&A](https://thesquareplanet.com/blog/raft-qa/)

本节主要包含以下几个部分：

* Send AppendEntries RPC
* Receive AppendEntries RPC
* Safe Replication Checking Daemon
* CommitIndex Checking Daemon

## Send AppendEntries RPC

### HeartBeats & AppendEntries

想明白 HeartBeats 和 AppendEntries 的关系花了我很长时间，其实二者是相同的，AppendEntries 消息同时包含 HeartBeats 和 AppendEntries 两个功能，关于这一点，在 [Students Guide To Raft](https://thesquareplanet.com/blog/students-guide-to-raft/) 一文中也有很详细地描述。我在最开始实现时就是构建了两个函数，`sendHeartBeats` 和`sendAppendEntries`，后来这两个函数越写越像，才最终注意到这点。

### AppendEntries Sending Daemon

为了代码可读性更高，这里同样适用 Daemon 来发送 AppendEntries RPCs。

```go
{
    for i, _ := range rf.peers {
		if i != rf.me {
			go func(server int) {
				rf.sendAppendEntriesMessage(server)
			}(i)
		}
	}
}
```

在 `rf.sendAppendEntriesMessage(server)` 内部，循环发送消息。

### AppendEntries Sending Loop

最开始我们的实现可能是这样：

```go
func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
        if ok {
            // peer responds to request
        } else {
            // timeout
        }
        
        time.Sleep(HeartBeatTimeout)
    }
}
```

这样的实现可以帮助通过大部分测试，但当设定的网络状态变成 unreliable 时，即 TestUnreliableAgree2C, TestFigure8Unreliable2C, TestUnreliableChurn2C，就会出现各种问题。原因在于 rf.sendAppendEntriesMessage() 需要完成 HeartBeats 的功能，如果网络延迟很大，该 goroutine 将无法在等待上一个请求结果的时候发送 HeartBeats，导致 Election Timeout 频繁发生，不断选举。

因此，我们需要将请求及其结果处理的代码放进 goroutine 中：

```go
func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        go func() {
            ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
            if ok {
                // peer responds to request
            } else {
                // timeout
            }
        }()    
        time.Sleep(HeartBeatTimeout)
    }
}
```

### Negotiation Strategy

在 Leader 与 Follower 协商正确的 PrevLogEntry 的过程中，Raft 建议使用每次递减 1 的策略（见 Raft p8），但使用该策略无法通过本实验的测试，因此需要实现更快速、高效的策略。

## Receive AppendEntries RPC

严格按照 Figure 2 实现即可，没有什么坑。

## Safe Replication Checking Daemon

这里也将 Safe Replication Checking 逻辑单独拿到 Daemon 中处理，增加代码的可读性。

## CommitIndex Checking Daemon

只有当 commitIndex > lastApplied 的时候执行命令，Leader 执行命令后相当于告诉 Client 命令执行完毕

```go
go func() {
	for {
		rf.mu.Lock()
		if rf.commitIndex > rf.lastApplied {
			le := rf.logs[rf.lastApplied]
			rf.applyCh<-ApplyMsg{
				CommandValid: true,
				Command:      le.Command,
				CommandIndex: le.Index,
			}
			rf.lastApplied += 1
		}
		rf.mu.Unlock()
		time.Sleep(CheckLastAppliedTimeout)
	}
}()
```
