3B感觉要难不少,也需要仔细阅读 手册

所需字段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	state           State       // rf当前的状态
	heartbeatTimer  *time.Timer // 心跳timer
	electionTimeout *time.Timer // 超时选举 timer
	currentTerm     int         //当前的任期
	votedFor        int         // 投票给了哪个节点
	logs            []LogEntry  // 日志
	nextIndex       []int       // 要传给节点的下一个index
	matchIndex      []int       // 节点已经匹配的index
	commitIndex     int         // 已经提交的index
	lastApplied     int         //  已经返回给客户端的index
	applyCond       *sync.Cond
	applyCh         chan ApplyMsg
}

Append Entry

Leader 需要向所有的 Follower 发送日志,我在这里的就是将心跳和发送日志采用了同一个函数,每一次心跳都会调用requestAppendEntries()这个函数,但只有发现peer的日志落后时才会附带日志。为了避免单个网络差的 Follower 阻塞整个集群的日志提交进度, Leader 会为每一个 Peer 启动一个独立的后台协程。一旦发现 Follower 落后(nextIndex <= lastLogIndex),就立即构造 AppendEntries RPC 发送日志。如果不落后,则是一个空的心跳。对于有日志发送的rpc,直到Follower完全跟上Leader日志才会退出。

这里日志采用了Fast Backup (冲突优化的快速回溯),利用 Follower 返回的 XTermXIndex 信息,允许 Leader 在一次 RPC 交互中直接跨越整个 Term 的冲突日志,处理的时候有三种情况:

1. XTerm == -1:Follower 日志太短

  • XTerm == -1 表示该 peer 的日志长度比 PrevLogIndex 短,但并不是 term 冲突。
  • 说明该 peer 没有落后整个 term,只是缺少了一段日志。
  • Leader 直接将 nextIndex 设置为 reply.XIndex(即该 peer 的日志末尾 + 1)。
  • 之后继续循环发送,就能补齐该 peer 缺失的部分日志。

2. XTerm != -1 且 Leader 无此 term:Follower 有旧 term,但 Leader 没有

  • XTerm 表示 Follower 在 PrevLogIndex 出现了 冲突 term
  • Leader 自己没有这个 term,说明这整段日志都是冲突且必须覆盖。
  • 因此直接将 nextIndex 回退到 reply.XIndex(该 term 在 Follower 中的起始位置)。
  • 下一轮 AppendEntries 会覆盖 Follower 的这个冲突 term。

3. XTerm != -1 且 Leader 有此 term:双方都有该 term

  • Leader 找到自己日志中 XTerm 的最后一个 index(lastIndexOfXTerm)。
  • 由于双方都包含这个 term,可以跳过整段 term —— 这是 Raft 的 快速回退优化
  • nextIndex 设置为 lastIndexOfXTerm + 1,直接跳到 term 末尾。
  • 下一次发送能迅速对齐双方日志,避免逐条回退。

这部分容易错的点不多,主要还是要检查rpc,避免发送前后的状态不一致,导致使用了落后的返回值。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
func (rf *Raft) requestAppendEntries() {
	for peerId := range rf.peers {
		if peerId == rf.me {
			continue
		}

		go func(peerId int) {
			for {
				rf.mu.Lock()
				if rf.state != LEADER {
					rf.mu.Unlock()
					return
				}

				next := rf.nextIndex[peerId]

				// if next <= rf.matchIndex[peerId] {
				// 	next = rf.matchIndex[peerId] + 1
				// }

				DPrintf(dAppend, "S%d send append to S%d\n", rf.me, peerId)
				if next <= rf.lastIncludedIndex {
					rf.mu.Unlock()
					rf.InstallSnapshot(peerId)
					return
				}

				idx := next - rf.lastIncludedIndex
				lastLogIndex, _ := rf.getLastLog()
				containEntries := lastLogIndex >= next

				entries := make([]LogEntry, len(rf.logs[idx:]))
				if containEntries {
					copy(entries, rf.logs[idx:])
					DPrintf(dAppend, "%v Send AppendEntries to S%d, next is %d, entries is %v\n", rf, peerId, next, entries)
				}
				prevLogIndex := next - 1
				prevLogTerm := rf.logs[idx-1].Term
				args := AppendEnrtiesArgs{rf.currentTerm, rf.me, prevLogIndex, prevLogTerm, entries, rf.commitIndex}
				reply := AppendEnrtiesReply{}

				rf.mu.Unlock()
				if !rf.sendAppendEntries(peerId, &args, &reply) {
					return
				}

				rf.mu.Lock()

				if args.Term != rf.currentTerm || rf.state != LEADER {
					rf.mu.Unlock()
					return
				}

				if reply.Term > rf.currentTerm {
					rf.changeState(FOLLOWER)
					rf.currentTerm = reply.Term
					rf.persist(nil)
					rf.mu.Unlock()
					return
				}

				if reply.Success {

					if containEntries {
						rf.matchIndex[peerId] = prevLogIndex + len(entries)
						rf.nextIndex[peerId] = prevLogIndex + len(entries) + 1
						indexArr := make([]int, len(rf.peers))
						copy(indexArr, rf.matchIndex)
						DPrintf(dAppend, "S%d: %v\n", rf.me, indexArr)
						sort.Ints(indexArr)
						newCommitIndex := indexArr[(len(indexArr)-1)/2]
						if newCommitIndex > rf.commitIndex && rf.logs[newCommitIndex-rf.lastIncludedIndex].Term == rf.currentTerm {
							rf.commitIndex = newCommitIndex
							rf.applyCond.Signal()
						}
					}
					rf.mu.Unlock()
					return
				} else {
					if reply.XTerm == -1 {
						rf.nextIndex[peerId] = reply.XIndex
					} else {
						lastIndexOfXTerm := rf.findLastIndexOfTerm(reply.XTerm)
						if lastIndexOfXTerm == -1 {
							rf.nextIndex[peerId] = reply.XIndex
						} else {
							rf.nextIndex[peerId] = lastIndexOfXTerm + 1
						}
					}
					rf.mu.Unlock()
				}
			}

		}(peerId)
	}
}

Handle Append Entry

1. Term 检查: 避免发消息的是一个过期的 Leader: 当Term没问题后,就确认了Leader的身份,就可以接收心跳,重置选举超时了。

2. 快速回溯:

这里有两种情况需要返回并告知Leader:

  1. 日志缺失: 如果 rf.getLastLog() < args.PrevLogIndex,说明 Follower 缺少中间的一段日志。直接返回 XIndex = lastLogIndex + 1,告诉 Leader:“我这里只有这么多,请从这个位置开始发。”

  2. 存在这个index但任期冲突: 如果 rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm,说明在接缝处发生了历史分歧(通常是因为旧 Leader 崩溃导致)。寻找冲突 Term 的起始位置。向前遍历日志,找到该冲突 Term 的第一条日志索引作为 XIndex。这样 Leader就可以 一次性跳过整个冲突 Term,而不是每次 RPC 只回退一个 Index。

  3. 幂等追加:逐条比对,找到第一个不匹配的位置,只有当真正出现冲突或有新数据时,才进行截断和追加。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
func (rf *Raft) HandleAppendEntries(args *AppendEnrtiesRequest, reply *AppendEnrtiesResponse) {

	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Success = false
	reply.Term = rf.currentTerm
	reply.XTerm = -1
	reply.XIndex = -1
	reply.Xlen = -1

  // 检查任期是否符合要求
	if args.Term < rf.currentTerm {
		if len(rf.logs) == 1 {
			DPrintf("%v received AppendEntries from S%d, but term ineligible\n", rf, args.LeaderID)
		}

		return
	}

	if args.Term >= rf.currentTerm {
    if rf.state == CANDIDATE{
      rf.changeState(FOLLOWER)
    } 
		rf.currentTerm = args.Term
		rf.persist()
		reply.Term = rf.currentTerm
	}
	rf.resetElectionTimer()

	lastLogIndex, _ := rf.getLastLog()

  // 告诉leader我们缺少的log index的位置
	if lastLogIndex < args.PrevLogIndex {
		reply.XIndex = lastLogIndex + 1
		return
	}

	if rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
		reply.XTerm = rf.logs[args.PrevLogIndex].Term
		for i := args.PrevLogIndex; i > 0; i-- {
			if rf.logs[i].Term != reply.XTerm {
				reply.XIndex = rf.logs[i].Index
				break
			}
		}
		return
	}

	if len(args.Entries) > 0 {
		// DPrintf("%v, rf.logs: %v, entries: %v\n", rf, rf.logs, args.Entries)

		i, j := args.PrevLogIndex+1, 0
		for i < len(rf.logs) && j < len(args.Entries) && rf.logs[i].Term == args.Entries[j].Term {
			i += 1
			j += 1
		}
		if j < len(args.Entries) {
			rf.logs = append(rf.logs[:i], args.Entries[j:]...)
			rf.persist()
			// DPrintf("%v appends log entries: %v", rf, args.Entries[j:])
		}

	}

	reply.Success = true

	newCommitIndex := min(args.LeaderCommit, len(rf.logs)-1)
	if newCommitIndex > rf.commitIndex {
		rf.commitIndex = newCommitIndex
		rf.applyCond.Signal()
	}

}

applySubmit

这个就没有什么过多可说的了,这就是一个消费协程,一旦 Leader 更新了 commitIndex 或者 Follower 收到新的 LeaderCommit,都会触发 Signal() 唤醒该协程。唤醒后复制所需要提交的日志,然后一定要先释放锁再提交。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// 需要一个单独的loop,发现有新的共识完成的时候,及时提交
func (rf *Raft) applySubmit() {

	for !rf.killed() {
		rf.mu.Lock()
		for rf.commitIndex <= rf.lastApplied && !rf.killed() {
			rf.applyCond.Wait()
		}
		applyMsgs := []ApplyMsg{}

		for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
			entry := rf.logs[i]
			applyMsg := ApplyMsg{
				CommandValid: true,
				Command:      entry.Command,
				CommandIndex: entry.Index,
			}
			applyMsgs = append(applyMsgs, applyMsg)
			rf.lastApplied += 1
		}

		DPrintf("%v applies log up to index=%d", rf, rf.lastApplied)

    // 提交可能比较耗时,所以先释放锁再提交
		rf.mu.Unlock()

		for _, msg := range applyMsgs {
			rf.applyCh <- msg
		}
	}

}

易错的点

首先还是手册里说到的,在append entry的时候,follower要只append不同的那一部分。原因是rpc到来的顺序 可能不一致,比如可能Leader先发了index 3,然后Leader又有了index 4,之后Leader发送了第二个rpc,但是由于网络延迟,第二个rpc先到,如果我们直接截取加append就会导致4的丢失(分布式一个最大的教训就是永远不要相信发来的rpc)。我在这里就错过,由于commitIndex还会是4,导致在applySubmit中commitIndex会大于数组长度,出现越界,由于之前没有好好看手册,在这里debug了好久。

The if here is crucial. If the follower has all the entries the leader sent, the follower MUST NOT truncate its log. Any elements following the entries sent by the leader MUST be kept. This is because we could be receiving an outdated AppendEntries RPC from the leader, and truncating the log would mean “taking back” entries that we may have already told the leader that we have in our log.

还有就是手册里说的心跳并不是一个特例,我在这里的想法是只要发送Append Entry时lastLogIndex >= next我们就附带entry。

Many of our students assumed that heartbeats were somehow “special”; that when a peer receives a heartbeat, it should treat it differently from a non-heartbeat AppendEntries RPC. In particular, many would simply reset their election timer when they received a heartbeat, and then return success, without performing any of the checks specified in Figure 2. This is extremely dangerous.