3B感觉要难不少,也需要仔细阅读 手册

所需字段

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
type Raft struct {
	mu        sync.Mutex          // Lock to protect shared access to this peer's state
	peers     []*labrpc.ClientEnd // RPC end points of all peers
	persister *Persister          // Object to hold this peer's persisted state
	me        int                 // this peer's index into peers[]
	dead      int32               // set by Kill()

	state           State       // rf当前的状态
	heartbeatTimer  *time.Timer // 心跳timer
	electionTimeout *time.Timer // 超时选举 timer
	currentTerm     int         //当前的任期
	votedFor        int         // 投票给了哪个节点
	logs            []LogEntry  // 日志
	nextIndex       []int       // 要传给节点的下一个index
	matchIndex      []int       // 节点已经匹配的index
	commitIndex     int         // 已经提交的index
	lastApplied     int         //  已经返回给客户端的index
	applyCond       *sync.Cond
	applyCh         chan ApplyMsg
}

Append Entry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
func (rf *Raft) requestAppendEntries() {
	for peerId := range rf.peers {
		if peerId == rf.me {
			continue
		}

		go func(peerId int) {
			for {
				rf.mu.Lock()
				if rf.state != LEADER {
					rf.mu.Unlock()
					return
				}

				next := rf.nextIndex[peerId]

				if next == 1 {
					DPrintf("peerId: %d, next Index=1, match Index=%d\n", peerId, rf.matchIndex[peerId])
				}

				if next <= rf.matchIndex[peerId] {
					rf.mu.Unlock()
					return
				}

				lastLogIndex, _ := rf.getLastLog()
				containEntries := lastLogIndex >= next

				entries := make([]LogEntry, len(rf.logs[next:]))
				if containEntries {
					copy(entries, rf.logs[next:])
				}
				prevLogIndex := next - 1
				prevLogTerm := rf.logs[next-1].Term
				args := AppendEnrtiesRequest{rf.currentTerm, rf.me, prevLogIndex, prevLogTerm, entries, rf.commitIndex}
				reply := AppendEnrtiesResponse{}

				rf.mu.Unlock()
				if !rf.sendAppendEntries(peerId, &args, &reply) {
					return
				}

				rf.mu.Lock()
				DPrintf("%v Send AppendEntries to S%d, next is %d, entries is %v\n", rf, peerId, next, entries)

				if args.Term != rf.currentTerm {
					rf.mu.Unlock()
					return
				}

				if reply.Term > rf.currentTerm {
					rf.changeState(FOLLOWER)
					rf.currentTerm = reply.Term
					rf.persist()
					rf.mu.Unlock()
					return
				}

				if reply.Success {

					if containEntries {
						rf.matchIndex[peerId] = prevLogIndex + len(entries)
						rf.nextIndex[peerId] = prevLogIndex + len(entries) + 1
						indexArr := make([]int, len(rf.peers))
						copy(indexArr, rf.matchIndex)
						sort.Ints(indexArr)

						newCommitIndex := indexArr[(len(indexArr)-1)/2]
						if newCommitIndex > rf.commitIndex && rf.logs[newCommitIndex].Term == rf.currentTerm {
							rf.commitIndex = newCommitIndex
							rf.applyCond.Signal()
						}
					}
					rf.mu.Unlock()
					return
				} else {
					if reply.XTerm == -1 {
						rf.nextIndex[peerId] = reply.XIndex
					} else {
						lastIndexOfXTerm := rf.findLastIndexOfTerm(reply.XTerm)
						if lastIndexOfXTerm == -1 {
							rf.nextIndex[peerId] = reply.XIndex
						} else {
							rf.nextIndex[peerId] = lastIndexOfXTerm + 1
						}
					}
					rf.mu.Unlock()
				}
			}

		}(peerId)
	}
}

Handle Append Entry

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
func (rf *Raft) HandleAppendEntries(args *AppendEnrtiesRequest, reply *AppendEnrtiesResponse) {

	rf.mu.Lock()
	defer rf.mu.Unlock()

	reply.Success = false
	reply.Term = rf.currentTerm
	reply.XTerm = -1
	reply.XIndex = -1
	reply.Xlen = -1

	if len(rf.logs) == 1 {
		DPrintf("%v received AppendEntries from S%d (term=%d, prevLogIndex=%d, prevLogTerm=%d, entries=%d)\n",
			rf, args.LeaderID, args.Term, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
	}

	if args.Term < rf.currentTerm {
		if len(rf.logs) == 1 {
			DPrintf("%v received AppendEntries from S%d 137 return\n", rf, args.LeaderID)
		}

		return
	}
	if args.Term == rf.currentTerm && rf.state == CANDIDATE {
		rf.changeState(FOLLOWER)
		rf.persist()
	}

	if args.Term > rf.currentTerm {
		rf.changeState(FOLLOWER)
		rf.currentTerm = args.Term
		rf.persist()
		reply.Term = rf.currentTerm
	}
	rf.resetElectionTimer()

	lastLogIndex, _ := rf.getLastLog()

	if lastLogIndex < args.PrevLogIndex {
		reply.XIndex = lastLogIndex + 1
		if len(rf.logs) == 1 {
			DPrintf("%v received AppendEntries from S%d 160 return, XIndex is %d\n", rf, args.LeaderID, reply.XIndex)
		}
		return
	}

	if rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
		reply.XTerm = rf.logs[args.PrevLogIndex].Term
		for i := args.PrevLogIndex; i > 0; i-- {
			if rf.logs[i].Term != reply.XTerm {
				reply.XIndex = rf.logs[i].Index
				break
			}
		}
		return
	}

	if len(args.Entries) > 0 {
		// DPrintf("%v, rf.logs: %v, entries: %v\n", rf, rf.logs, args.Entries)

		i, j := args.PrevLogIndex+1, 0
		for i < len(rf.logs) && j < len(args.Entries) && rf.logs[i].Term == args.Entries[j].Term {
			i += 1
			j += 1
		}
		if j < len(args.Entries) {
			rf.logs = append(rf.logs[:i], args.Entries[j:]...)
			rf.persist()
			// DPrintf("%v appends log entries: %v", rf, args.Entries[j:])
		}

	}

	reply.Success = true

	newCommitIndex := min(args.LeaderCommit, len(rf.logs)-1)
	if newCommitIndex > rf.commitIndex {
		rf.commitIndex = newCommitIndex
		rf.applyCond.Signal()
	}

}

applySubmit

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (rf *Raft) applySubmit() {

	for !rf.killed() {
		rf.mu.Lock()
		for rf.commitIndex <= rf.lastApplied && !rf.killed() {
			rf.applyCond.Wait()
		}
		applyMsgs := []ApplyMsg{}

		for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ {
			entry := rf.logs[i]
			applyMsg := ApplyMsg{
				CommandValid: true,
				Command:      entry.Command,
				CommandIndex: entry.Index,
			}
			applyMsgs = append(applyMsgs, applyMsg)
			rf.lastApplied += 1
		}

		DPrintf("%v applies log up to index=%d", rf, rf.lastApplied)

		rf.mu.Unlock()

		for _, msg := range applyMsgs {
			rf.applyCh <- msg
		}
	}

}

易错的点

首先还是手册里说到的,在append entry的时候,follower要只append不同的那一部分,我在这里就错过,导致在applySubmit中commitIndex会大于数组长度,出现越界。

The if here is crucial. If the follower has all the entries the leader sent, the follower MUST NOT truncate its log. Any elements following the entries sent by the leader MUST be kept. This is because we could be receiving an outdated AppendEntries RPC from the leader, and truncating the log would mean “taking back” entries that we may have already told the leader that we have in our log.

还有就是手册里说的心跳并不是一个特例,我在这里的想法是只要发送Append Entry时lastLogIndex >= next我们就附带entry。

Many of our students assumed that heartbeats were somehow “special”; that when a peer receives a heartbeat, it should treat it differently from a non-heartbeat AppendEntries RPC. In particular, many would simply reset their election timer when they received a heartbeat, and then return success, without performing any of the checks specified in Figure 2. This is extremely dangerous.