# Lab: Raft - Log Replication

## 说在前面

在实验的过程中，一定要反复阅读：

* Raft Paper Figure 2
* 6.824 Course Notes
* [Students' Guide to Raft](https://thesquareplanet.com/blog/students-guide-to-raft/)
* [Raft Q\&A](https://thesquareplanet.com/blog/raft-qa/)

本节主要包含以下几个部分：

* Send AppendEntries RPC
* Receive AppendEntries RPC
* Safe Replication Checking Daemon
* CommitIndex Checking Daemon

## Send AppendEntries RPC

### HeartBeats & AppendEntries

想明白 HeartBeats 和 AppendEntries 的关系花了我很长时间，其实二者是相同的，AppendEntries 消息同时包含 HeartBeats 和 AppendEntries 两个功能，关于这一点，在 [Students Guide To Raft](https://thesquareplanet.com/blog/students-guide-to-raft/) 一文中也有很详细地描述。我在最开始实现时就是构建了两个函数，`sendHeartBeats` 和`sendAppendEntries`，后来这两个函数越写越像，才最终注意到这点。

### AppendEntries Sending Daemon

为了代码可读性更高，这里同样适用 Daemon 来发送 AppendEntries RPCs。

```go
{
    for i, _ := range rf.peers {
		if i != rf.me {
			go func(server int) {
				rf.sendAppendEntriesMessage(server)
			}(i)
		}
	}
}
```

在 `rf.sendAppendEntriesMessage(server)` 内部，循环发送消息。

### AppendEntries Sending Loop

最开始我们的实现可能是这样：

```go
func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
        if ok {
            // peer responds to request
        } else {
            // timeout
        }
        
        time.Sleep(HeartBeatTimeout)
    }
}
```

这样的实现可以帮助通过大部分测试，但当设定的网络状态变成 unreliable 时，即 TestUnreliableAgree2C, TestFigure8Unreliable2C, TestUnreliableChurn2C，就会出现各种问题。原因在于 rf.sendAppendEntriesMessage() 需要完成 HeartBeats 的功能，如果网络延迟很大，该 goroutine 将无法在等待上一个请求结果的时候发送 HeartBeats，导致 Election Timeout 频繁发生，不断选举。

因此，我们需要将请求及其结果处理的代码放进 goroutine 中：

```go
func (rf *Raft) sendAppendEntriesMessage(server int) {
    for {
        if !rf.isLeader() {
            time.Sleep(HeartBeatTimeout)
            continue
        }
        // args, reply preparation...
        go func() {
            ok := rf.peers[server].Call("Raft.AppendEntries", args, reply)
            if ok {
                // peer responds to request
            } else {
                // timeout
            }
        }()    
        time.Sleep(HeartBeatTimeout)
    }
}
```

### Negotiation Strategy

在 Leader 与 Follower 协商正确的 PrevLogEntry 的过程中，Raft 建议使用每次递减 1 的策略（见 Raft p8），但使用该策略无法通过本实验的测试，因此需要实现更快速、高效的策略。

## Receive AppendEntries RPC

严格按照 Figure 2 实现即可，没有什么坑。

## Safe Replication Checking Daemon

这里也将 Safe Replication Checking 逻辑单独拿到 Daemon 中处理，增加代码的可读性。

## CommitIndex Checking Daemon

只有当 commitIndex > lastApplied 的时候执行命令，Leader 执行命令后相当于告诉 Client 命令执行完毕

```go
go func() {
	for {
		rf.mu.Lock()
		if rf.commitIndex > rf.lastApplied {
			le := rf.logs[rf.lastApplied]
			rf.applyCh<-ApplyMsg{
				CommandValid: true,
				Command:      le.Command,
				CommandIndex: le.Index,
			}
			rf.lastApplied += 1
		}
		rf.mu.Unlock()
		time.Sleep(CheckLastAppliedTimeout)
	}
}()
```


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://zhenghe.gitbook.io/open-courses/mit-6.824/lab-raft-log-replication.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
