1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
|
func (rf *Raft) HandleAppendEntries(args *AppendEnrtiesRequest, reply *AppendEnrtiesResponse) {
rf.mu.Lock()
defer rf.mu.Unlock()
reply.Success = false
reply.Term = rf.currentTerm
reply.XTerm = -1
reply.XIndex = -1
reply.Xlen = -1
if len(rf.logs) == 1 {
DPrintf("%v received AppendEntries from S%d (term=%d, prevLogIndex=%d, prevLogTerm=%d, entries=%d)\n",
rf, args.LeaderID, args.Term, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries))
}
if args.Term < rf.currentTerm {
if len(rf.logs) == 1 {
DPrintf("%v received AppendEntries from S%d 137 return\n", rf, args.LeaderID)
}
return
}
if args.Term == rf.currentTerm && rf.state == CANDIDATE {
rf.changeState(FOLLOWER)
rf.persist()
}
if args.Term > rf.currentTerm {
rf.changeState(FOLLOWER)
rf.currentTerm = args.Term
rf.persist()
reply.Term = rf.currentTerm
}
rf.resetElectionTimer()
lastLogIndex, _ := rf.getLastLog()
if lastLogIndex < args.PrevLogIndex {
reply.XIndex = lastLogIndex + 1
if len(rf.logs) == 1 {
DPrintf("%v received AppendEntries from S%d 160 return, XIndex is %d\n", rf, args.LeaderID, reply.XIndex)
}
return
}
if rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm {
reply.XTerm = rf.logs[args.PrevLogIndex].Term
for i := args.PrevLogIndex; i > 0; i-- {
if rf.logs[i].Term != reply.XTerm {
reply.XIndex = rf.logs[i].Index
break
}
}
return
}
if len(args.Entries) > 0 {
// DPrintf("%v, rf.logs: %v, entries: %v\n", rf, rf.logs, args.Entries)
i, j := args.PrevLogIndex+1, 0
for i < len(rf.logs) && j < len(args.Entries) && rf.logs[i].Term == args.Entries[j].Term {
i += 1
j += 1
}
if j < len(args.Entries) {
rf.logs = append(rf.logs[:i], args.Entries[j:]...)
rf.persist()
// DPrintf("%v appends log entries: %v", rf, args.Entries[j:])
}
}
reply.Success = true
newCommitIndex := min(args.LeaderCommit, len(rf.logs)-1)
if newCommitIndex > rf.commitIndex {
rf.commitIndex = newCommitIndex
rf.applyCond.Signal()
}
}
|