Mit6.5840lab3B

3B感觉要难不少,也需要仔细阅读 手册 所需字段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() state State // rf当前的状态 heartbeatTimer *time.Timer // 心跳timer electionTimeout *time.Timer // 超时选举 timer currentTerm int //当前的任期 votedFor int // 投票给了哪个节点 logs []LogEntry // 日志 nextIndex []int // 要传给节点的下一个index matchIndex []int // 节点已经匹配的index commitIndex int // 已经提交的index lastApplied int // 已经返回给客户端的index applyCond *sync.Cond applyCh chan ApplyMsg } Append Entry 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 func (rf *Raft) requestAppendEntries() { for peerId := range rf.peers { if peerId == rf.me { continue } go func(peerId int) { for { rf.mu.Lock() if rf.state != LEADER { rf.mu.Unlock() return } next := rf.nextIndex[peerId] if next == 1 { DPrintf("peerId: %d, next Index=1, match Index=%d\n", peerId, rf.matchIndex[peerId]) } if next <= rf.matchIndex[peerId] { rf.mu.Unlock() return } lastLogIndex, _ := rf.getLastLog() containEntries := lastLogIndex >= next entries := make([]LogEntry, len(rf.logs[next:])) if containEntries { copy(entries, rf.logs[next:]) } prevLogIndex := next - 1 prevLogTerm := rf.logs[next-1].Term args := AppendEnrtiesRequest{rf.currentTerm, rf.me, prevLogIndex, prevLogTerm, entries, rf.commitIndex} reply := AppendEnrtiesResponse{} rf.mu.Unlock() if !rf.sendAppendEntries(peerId, &args, &reply) { return } rf.mu.Lock() DPrintf("%v Send AppendEntries to S%d, next is %d, entries is %v\n", rf, peerId, next, entries) if args.Term != rf.currentTerm { rf.mu.Unlock() return } if reply.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = reply.Term rf.persist() rf.mu.Unlock() return } if reply.Success { if containEntries { rf.matchIndex[peerId] = prevLogIndex + len(entries) rf.nextIndex[peerId] = prevLogIndex + len(entries) + 1 indexArr := make([]int, len(rf.peers)) copy(indexArr, rf.matchIndex) sort.Ints(indexArr) newCommitIndex := indexArr[(len(indexArr)-1)/2] if newCommitIndex > rf.commitIndex && rf.logs[newCommitIndex].Term == rf.currentTerm { rf.commitIndex = newCommitIndex rf.applyCond.Signal() } } rf.mu.Unlock() return } else { if reply.XTerm == -1 { rf.nextIndex[peerId] = reply.XIndex } else { lastIndexOfXTerm := rf.findLastIndexOfTerm(reply.XTerm) if lastIndexOfXTerm == -1 { rf.nextIndex[peerId] = reply.XIndex } else { rf.nextIndex[peerId] = lastIndexOfXTerm + 1 } } rf.mu.Unlock() } } }(peerId) } } Handle Append Entry 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 func (rf *Raft) HandleAppendEntries(args *AppendEnrtiesRequest, reply *AppendEnrtiesResponse) { rf.mu.Lock() defer rf.mu.Unlock() reply.Success = false reply.Term = rf.currentTerm reply.XTerm = -1 reply.XIndex = -1 reply.Xlen = -1 if len(rf.logs) == 1 { DPrintf("%v received AppendEntries from S%d (term=%d, prevLogIndex=%d, prevLogTerm=%d, entries=%d)\n", rf, args.LeaderID, args.Term, args.PrevLogIndex, args.PrevLogTerm, len(args.Entries)) } if args.Term < rf.currentTerm { if len(rf.logs) == 1 { DPrintf("%v received AppendEntries from S%d 137 return\n", rf, args.LeaderID) } return } if args.Term == rf.currentTerm && rf.state == CANDIDATE { rf.changeState(FOLLOWER) rf.persist() } if args.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = args.Term rf.persist() reply.Term = rf.currentTerm } rf.resetElectionTimer() lastLogIndex, _ := rf.getLastLog() if lastLogIndex < args.PrevLogIndex { reply.XIndex = lastLogIndex + 1 if len(rf.logs) == 1 { DPrintf("%v received AppendEntries from S%d 160 return, XIndex is %d\n", rf, args.LeaderID, reply.XIndex) } return } if rf.logs[args.PrevLogIndex].Term != args.PrevLogTerm { reply.XTerm = rf.logs[args.PrevLogIndex].Term for i := args.PrevLogIndex; i > 0; i-- { if rf.logs[i].Term != reply.XTerm { reply.XIndex = rf.logs[i].Index break } } return } if len(args.Entries) > 0 { // DPrintf("%v, rf.logs: %v, entries: %v\n", rf, rf.logs, args.Entries) i, j := args.PrevLogIndex+1, 0 for i < len(rf.logs) && j < len(args.Entries) && rf.logs[i].Term == args.Entries[j].Term { i += 1 j += 1 } if j < len(args.Entries) { rf.logs = append(rf.logs[:i], args.Entries[j:]...) rf.persist() // DPrintf("%v appends log entries: %v", rf, args.Entries[j:]) } } reply.Success = true newCommitIndex := min(args.LeaderCommit, len(rf.logs)-1) if newCommitIndex > rf.commitIndex { rf.commitIndex = newCommitIndex rf.applyCond.Signal() } } applySubmit 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 func (rf *Raft) applySubmit() { for !rf.killed() { rf.mu.Lock() for rf.commitIndex <= rf.lastApplied && !rf.killed() { rf.applyCond.Wait() } applyMsgs := []ApplyMsg{} for i := rf.lastApplied + 1; i <= rf.commitIndex; i++ { entry := rf.logs[i] applyMsg := ApplyMsg{ CommandValid: true, Command: entry.Command, CommandIndex: entry.Index, } applyMsgs = append(applyMsgs, applyMsg) rf.lastApplied += 1 } DPrintf("%v applies log up to index=%d", rf, rf.lastApplied) rf.mu.Unlock() for _, msg := range applyMsgs { rf.applyCh <- msg } } } 易错的点 首先还是手册里说到的,在append entry的时候,follower要只append不同的那一部分,我在这里就错过,导致在applySubmit中commitIndex会大于数组长度,出现越界。 ...

2025-10-08 · 5 分钟 · wen

6.5840lab3A raft leader election

Key/Value Server实验的实现

2025-10-08 · 3 分钟 · wen

6.5840lab2 Key/Value Server

Key/Value Server实验的实现

2025-07-28 · 3 分钟 · wen

MiT6.5840 lab1 MapReduce实现

lab1 MapReduce的实现

2025-07-22 · 4 分钟 · wen