在 Raft 中,“持久化”操作可以分为两种:
包含快照数据的持久化 (Snapshot-based Persistence): 时机:主要时机是由上层应用(Application Layer)驱动的。当上层状态机(如一个键值存储)变得非常大时,它会主动调用 Raft 层的接口来生成快照,以释放内存并防止 Raft 日志无限增长。当 Follower 落后太多时,Leader 无法通过常规的 AppendEntries 来同步(因为日志已被截断),也会发送完整的快照给该 Follower。 内容:除了 Raft 的核心元数据,还会将这个完整的应用状态数据一起保存。这主要用于压缩旧的日志,避免其无限增长。 常规的持久化 (Regular Persistence): 时机:每次 currentTerm、votedFor 或 rf.logs(日志条目)发生改变时立即执行。 内容:只保存 Raft 协议自身的核心元数据: currentTerm (当前任期) votedFor (投给了谁) rf.logs (日志条目列表) 为什么需要常规的持久化?
因为这些核心元数据直接决定了集群的安全性。如果节点崩溃重启后丢失了这些信息,就可能导致严重错误,例如:
在同一任期内给多个候选人投票(破坏选举安全)。 忘记自己是 Leader 而发起无效选举。 丢失已接收的日志条目,导致数据不一致。 为什么是这三个数据?
currentTerm、votedFor 和 logs是 Raft 算法能够安全运行的最小必要状态集合,它们共同构成了协议的“记忆”。currentTerm 作为集群的逻辑时钟,确保所有节点对“现在是什么时候”有一致的认知,防止过时的消息干扰当前决策;votedFor 则强制执行了选举安全的核心原则“一任一票”,通过持久化记录在哪个 Candidate 上投出了关键的一票,从根本上杜绝了同一任期内出现多个 Leader 的可能;而 rf.logs 存储了所有已接收但未必提交的日志条目,它是实现日志复制和状态机一致性基础,保证了即使节点崩溃,那些已经承诺要处理的命令不会丢失,并且新旧 Leader 能够基于相同的日志历史进行同步。这三者缺一不可,currentTerm 定义了时间线,votedFor 在该时间线上锁定了唯一的领导者合法性,rf.logs 则承载了需要被一致复制的具体状态变更,它们协同工作,确保了 Raft 在面对节点故障时依然能维持正确性和一致性。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func (rf *Raft) persist(snapshot []byte) { raftstate := rf.encodeState() if snapshot == nil { snapshot = rf.persister.ReadSnapshot() } rf.persister.Save(raftstate, snapshot) } // restore previously persisted state. func (rf *Raft) readPersist(data []byte) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf(dPersist, "readPersist\n") // DPrintf("readPersist\n") if data == nil || len(data) < 1 { return } r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) var term int var votedFor int var logs []LogEntry if d.Decode(&term) != nil || d.Decode(&votedFor) != nil || d.Decode(&logs) != nil { return } DPrintf(dPersist, "%v load len of log is %d\n", rf, len(logs)) rf.currentTerm = term rf.votedFor = votedFor rf.logs = logs rf.lastIncludedIndex = rf.logs[0].Index rf.lastIncludedTerm = rf.logs[0].Term rf.lastApplied = rf.lastIncludedIndex rf.commitIndex = rf.lastIncludedIndex } // the service says it has created a snapshot that has // all info up to and including index. this means the // service no longer needs the log through (and including) // that index. Raft should now trim its log as much as possible. func (rf *Raft) Snapshot(index int, snapshot []byte) { // Your code here (3D). rf.mu.Lock() defer rf.mu.Unlock() if rf.killed() || index <= rf.lastIncludedIndex || index > rf.lastApplied { return } newIndex := index - rf.lastIncludedIndex rf.lastIncludedTerm = rf.logs[newIndex].Term rf.lastIncludedIndex = index logs := make([]LogEntry, len(rf.logs[newIndex+1:])) copy(logs, rf.logs[newIndex+1:]) rf.logs = append([]LogEntry{{rf.lastIncludedIndex, rf.lastIncludedTerm, nil}}, logs...) rf.persist(snapshot) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 func (rf *Raft) InstallSnapshot(peerId int) { rf.mu.Lock() args := SnapshotArgs{ Term: rf.currentTerm, LeaderId: rf.me, LastIncludedIndex: rf.lastIncludedIndex, LastIncludedTerm: rf.lastIncludedTerm, Snapshot: rf.persister.ReadSnapshot(), } reply := SnapshotReply{} DPrintf(dSnap, "%v send snapshot to S%d\n", rf, peerId) rf.mu.Unlock() if !rf.sendSnapshot(peerId, &args, &reply) { return } rf.mu.Lock() defer rf.mu.Unlock() DPrintf(dSnap, "S%d get reply from S%d\n", rf.me, peerId) if args.Term != rf.currentTerm || rf.state != LEADER { return } if reply.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = reply.Term rf.persist(nil) return } rf.nextIndex[peerId] = rf.lastIncludedIndex + 1 rf.matchIndex[peerId] = rf.lastIncludedIndex } func (rf *Raft) HandleSnapshot(args *SnapshotArgs, reply *SnapshotReply) { rf.mu.Lock() reply.Term = rf.currentTerm if args.Term < rf.currentTerm { rf.mu.Unlock() return } if args.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = args.Term rf.persist(nil) reply.Term = rf.currentTerm } rf.resetElectionTimer() if args.LastIncludedIndex <= rf.lastIncludedIndex || args.LastIncludedIndex <= rf.commitIndex { rf.mu.Unlock() return } DPrintf(dSnap, "%v HandleSnapshot, leaderID: %d, LastIncludedIndex: %d, LastIncludedTerm: %d\n", rf, args.LeaderId, args.LastIncludedIndex, args.LastIncludedTerm) index := args.LastIncludedIndex - rf.lastIncludedIndex if index < len(rf.logs) && rf.logs[index].Index == args.LastIncludedIndex && rf.logs[index].Term == args.LastIncludedTerm { logs := make([]LogEntry, len(rf.logs[index+1:])) copy(logs, rf.logs[index+1:]) rf.logs = append([]LogEntry{{args.LastIncludedIndex, args.LastIncludedTerm, nil}}, logs...) } else { rf.logs = []LogEntry{{args.LastIncludedIndex, args.LastIncludedTerm, nil}} } rf.lastIncludedIndex = args.LastIncludedIndex rf.lastIncludedTerm = args.LastIncludedTerm rf.commitIndex = args.LastIncludedIndex rf.lastApplied = args.LastIncludedIndex rf.persist(args.Snapshot) rf.mu.Unlock() applyMsg := ApplyMsg{ SnapshotValid: true, Snapshot: args.Snapshot, SnapshotTerm: args.LastIncludedTerm, SnapshotIndex: args.LastIncludedIndex, } rf.applyCh <- applyMsg }