6.5840lab5

在这部分中我们要在lab4的基础上更近一步,lab4是只有一组raft服务,lab5就是Multiraft(multi-group),进一步加入了分片的机制,主要用于解决在大规模分布式系统中高可用、强一致性、可扩展性等问题。 单 Raft Group 的局限性: Raft 是一种用于实现分布式系统一致性的共识算法,通常以一个“Raft Group”(即一组节点共同维护一个日志副本)为单位工作。但在实际应用中,如果整个系统只用一个 Raft Group,有以下缺点: 写性能瓶颈: 所有的写请求必须由 Leader 单节点处理并定序,无法利用集群多节点的并发能力,吞吐量受限于单机资源。 扩展性受限: 增加节点仅能提升容灾能力,无法线性提升写性能,且集群总存储容量被限制在单机磁盘大小。 故障恢复慢与热点问题: 巨大的状态机导致快照生成和日志回放缓慢,故障恢复时间长;且单一 Group 难以进行细粒度的负载均衡,易形成读写热点。 分片 + 多 Raft Group: Multiraft 将整个数据集划分为多个分片(shard),每个分片由一个独立的 Raft Group 负责管理。 每个 Raft Group 只负责一部分数据,互不影响; 写/读请求可以并行处理,提高系统吞吐; 系统可以通过增加分片数量来横向扩展; 故障隔离:某个 Group 出现问题(如网络分区、节点宕机),不会影响其他 Group。 shardctrler 针对5A的实验,主要就是设计一个shardctrler,它的作用是维护集群的配置信息(Configuration),即负责管理“分片(Shard)”到“复制组(Replica Group / GID)”的映射关系,并在集群拓扑变化时自动进行分片的负载均衡(Rebalancing)。 具体来说,它需要通过 Raft 保证高可用和强一致性,并支持以下 4 个核心操作(RPC): Join (加入新组): 当新的复制组(Replica Group)加入集群时,Ctrler 需要将部分分片从现有组迁移给新组,以实现负载均衡。 Leave (移出旧组): 当某个复制组想要离开集群时,Ctrler 需要将其持有的分片重新分配给剩余的组,确保数据不丢失且分布均匀。 Move (手动迁移): 允许管理员强制将某个特定的分片(Shard)分配给特定的组(GID),主要用于调试或处理热点。 Query (查询配置): 允许 ShardKV 节点或 Client 查询最新的或历史版本的配置(Config),以便知道读写请求该发往哪里。 它本身一个group,所以基本的接收命令然后提交等操作和之前完全一样,主要就是这四个操作和rebalance,注意rebalance要最少移动原则。 ...

2025-12-12 · 8 分钟 · wen

6.5840lab4

这个部分就是在基于lab3的raft的基础上构建一个容错的、可复制的键值存储服务。其架构图在lab中也已经给出了,根据这个来做就可以了。 Client Client端的实现非常直观,和lab2的实现完全一样。本质上是一个屏蔽了网络故障和Leader切换的RPC重试循环。为了保证系统的线性一致性,每个客户端维护唯一的ClientID和单调递增的CommandID作为请求的唯一标识,每次发新的请求前要先把CommandID + 1,在收到明确的成功响应前,即便多次重试也保持CommandID不变,从而配合Server端进行去重。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 func (ck *Clerk) PutAppend(key string, value string, op string) { // You will have to modify this function. ck.CommandId += 1 args := PutAppendArgs{Key: key, Value: value, ClientId: ck.ClientId, CommandId: ck.CommandId} for { reply := PutAppendReply{} if !ck.servers[ck.LeaderId].Call("KVServer."+op, &args, &reply) || reply.Err != OK { ck.LeaderId = (ck.LeaderId + 1) % len(ck.servers) time.Sleep(retryInterval) continue } break } } Server KVServer 端的设计核心在于“共识层”与“状态机”的解耦协同。 也就是采用了**“异步提交 + 同步等待”**的模式:RPC Handler 收到请求后,调用 Raft 的 Start() 接口将命令写入日志,获得该日志的 index。由于 Raft 的共识过程是异步的,Handler 此时会创建一个通知通道(我在这里用的是waitCh),并以 index 为 Key 注册到等待列表中,随即阻塞等待。 ...

2025-12-05 · 3 分钟 · wen

6.5840lab3C

在 Raft 中,“持久化”操作可以分为两种: 包含快照数据的持久化 (Snapshot-based Persistence): 时机:主要时机是由上层应用(Application Layer)驱动的。当上层状态机(如一个键值存储)变得非常大时,它会主动调用 Raft 层的接口来生成快照,以释放内存并防止 Raft 日志无限增长。当 Follower 落后太多时,Leader 无法通过常规的 AppendEntries 来同步(因为日志已被截断),也会发送完整的快照给该 Follower。 内容:除了 Raft 的核心元数据,还会将这个完整的应用状态数据一起保存。这主要用于压缩旧的日志,避免其无限增长。 常规的持久化 (Regular Persistence): 时机:每次 currentTerm、votedFor 或 rf.logs(日志条目)发生改变时立即执行。 内容:只保存 Raft 协议自身的核心元数据: currentTerm (当前任期) votedFor (投给了谁) rf.logs (日志条目列表) 为什么需要常规的持久化? 因为这些核心元数据直接决定了集群的安全性。如果节点崩溃重启后丢失了这些信息,就可能导致严重错误,例如: 在同一任期内给多个候选人投票(破坏选举安全)。 忘记自己是 Leader 而发起无效选举。 丢失已接收的日志条目,导致数据不一致。 为什么是这三个数据? currentTerm、votedFor 和 logs是 Raft 算法能够安全运行的最小必要状态集合,它们共同构成了协议的“记忆”。currentTerm 作为集群的逻辑时钟,确保所有节点对“现在是什么时候”有一致的认知,防止过时的消息干扰当前决策;votedFor 则强制执行了选举安全的核心原则“一任一票”,通过持久化记录在哪个 Candidate 上投出了关键的一票,从根本上杜绝了同一任期内出现多个 Leader 的可能;而 rf.logs 存储了所有已接收但未必提交的日志条目,它是实现日志复制和状态机一致性基础,保证了即使节点崩溃,那些已经承诺要处理的命令不会丢失,并且新旧 Leader 能够基于相同的日志历史进行同步。这三者缺一不可,currentTerm 定义了时间线,votedFor 在该时间线上锁定了唯一的领导者合法性,rf.logs 则承载了需要被一致复制的具体状态变更,它们协同工作,确保了 Raft 在面对节点故障时依然能维持正确性和一致性。 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 func (rf *Raft) persist(snapshot []byte) { raftstate := rf.encodeState() if snapshot == nil { snapshot = rf.persister.ReadSnapshot() } rf.persister.Save(raftstate, snapshot) } // restore previously persisted state. func (rf *Raft) readPersist(data []byte) { rf.mu.Lock() defer rf.mu.Unlock() DPrintf(dPersist, "readPersist\n") // DPrintf("readPersist\n") if data == nil || len(data) < 1 { return } r := bytes.NewBuffer(data) d := labgob.NewDecoder(r) var term int var votedFor int var logs []LogEntry if d.Decode(&term) != nil || d.Decode(&votedFor) != nil || d.Decode(&logs) != nil { return } DPrintf(dPersist, "%v load len of log is %d\n", rf, len(logs)) rf.currentTerm = term rf.votedFor = votedFor rf.logs = logs rf.lastIncludedIndex = rf.logs[0].Index rf.lastIncludedTerm = rf.logs[0].Term rf.lastApplied = rf.lastIncludedIndex rf.commitIndex = rf.lastIncludedIndex } // the service says it has created a snapshot that has // all info up to and including index. this means the // service no longer needs the log through (and including) // that index. Raft should now trim its log as much as possible. func (rf *Raft) Snapshot(index int, snapshot []byte) { // Your code here (3D). rf.mu.Lock() defer rf.mu.Unlock() if rf.killed() || index <= rf.lastIncludedIndex || index > rf.lastApplied { return } newIndex := index - rf.lastIncludedIndex rf.lastIncludedTerm = rf.logs[newIndex].Term rf.lastIncludedIndex = index logs := make([]LogEntry, len(rf.logs[newIndex+1:])) copy(logs, rf.logs[newIndex+1:]) rf.logs = append([]LogEntry{{rf.lastIncludedIndex, rf.lastIncludedTerm, nil}}, logs...) rf.persist(snapshot) } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 func (rf *Raft) InstallSnapshot(peerId int) { rf.mu.Lock() args := SnapshotArgs{ Term: rf.currentTerm, LeaderId: rf.me, LastIncludedIndex: rf.lastIncludedIndex, LastIncludedTerm: rf.lastIncludedTerm, Snapshot: rf.persister.ReadSnapshot(), } reply := SnapshotReply{} DPrintf(dSnap, "%v send snapshot to S%d\n", rf, peerId) rf.mu.Unlock() if !rf.sendSnapshot(peerId, &args, &reply) { return } rf.mu.Lock() defer rf.mu.Unlock() DPrintf(dSnap, "S%d get reply from S%d\n", rf.me, peerId) if args.Term != rf.currentTerm || rf.state != LEADER { return } if reply.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = reply.Term rf.persist(nil) return } rf.nextIndex[peerId] = rf.lastIncludedIndex + 1 rf.matchIndex[peerId] = rf.lastIncludedIndex } func (rf *Raft) HandleSnapshot(args *SnapshotArgs, reply *SnapshotReply) { rf.mu.Lock() reply.Term = rf.currentTerm if args.Term < rf.currentTerm { rf.mu.Unlock() return } if args.Term > rf.currentTerm { rf.changeState(FOLLOWER) rf.currentTerm = args.Term rf.persist(nil) reply.Term = rf.currentTerm } rf.resetElectionTimer() if args.LastIncludedIndex <= rf.lastIncludedIndex || args.LastIncludedIndex <= rf.commitIndex { rf.mu.Unlock() return } DPrintf(dSnap, "%v HandleSnapshot, leaderID: %d, LastIncludedIndex: %d, LastIncludedTerm: %d\n", rf, args.LeaderId, args.LastIncludedIndex, args.LastIncludedTerm) index := args.LastIncludedIndex - rf.lastIncludedIndex if index < len(rf.logs) && rf.logs[index].Index == args.LastIncludedIndex && rf.logs[index].Term == args.LastIncludedTerm { logs := make([]LogEntry, len(rf.logs[index+1:])) copy(logs, rf.logs[index+1:]) rf.logs = append([]LogEntry{{args.LastIncludedIndex, args.LastIncludedTerm, nil}}, logs...) } else { rf.logs = []LogEntry{{args.LastIncludedIndex, args.LastIncludedTerm, nil}} } rf.lastIncludedIndex = args.LastIncludedIndex rf.lastIncludedTerm = args.LastIncludedTerm rf.commitIndex = args.LastIncludedIndex rf.lastApplied = args.LastIncludedIndex rf.persist(args.Snapshot) rf.mu.Unlock() applyMsg := ApplyMsg{ SnapshotValid: true, Snapshot: args.Snapshot, SnapshotTerm: args.LastIncludedTerm, SnapshotIndex: args.LastIncludedIndex, } rf.applyCh <- applyMsg }

2025-12-01 · 3 分钟 · wen

Mit6.5840lab3B

3B感觉要难不少,也需要仔细阅读 手册 所需字段 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 type Raft struct { mu sync.Mutex // Lock to protect shared access to this peer's state peers []*labrpc.ClientEnd // RPC end points of all peers persister *Persister // Object to hold this peer's persisted state me int // this peer's index into peers[] dead int32 // set by Kill() state State // rf当前的状态 heartbeatTimer *time.Timer // 心跳timer electionTimeout *time.Timer // 超时选举 timer currentTerm int //当前的任期 votedFor int // 投票给了哪个节点 logs []LogEntry // 日志 nextIndex []int // 要传给节点的下一个index matchIndex []int // 节点已经匹配的index commitIndex int // 已经提交的index lastApplied int // 已经返回给客户端的index applyCond *sync.Cond applyCh chan ApplyMsg } Append Entry Leader 需要向所有的 Follower 发送日志,我在这里的就是将心跳和发送日志采用了同一个函数,每一次心跳都会调用requestAppendEntries()这个函数,但只有发现peer的日志落后时才会附带日志。为了避免单个网络差的 Follower 阻塞整个集群的日志提交进度, Leader 会为每一个 Peer 启动一个独立的后台协程。一旦发现 Follower 落后(nextIndex <= lastLogIndex),就立即构造 AppendEntries RPC 发送日志。如果不落后,则是一个空的心跳。对于有日志发送的rpc,直到Follower完全跟上Leader日志才会退出。 ...

2025-10-08 · 6 分钟 · wen

6.5840lab3A raft leader election

Key/Value Server实验的实现

2025-10-08 · 4 分钟 · wen

6.5840lab2 Key/Value Server

Key/Value Server实验的实现

2025-07-28 · 3 分钟 · wen

MiT6.5840 lab1 MapReduce实现

lab1 MapReduce的实现

2025-07-22 · 4 分钟 · wen

MIT6.5840 Mapreduce Paper

阅读MapReduce论文的总结记录

2025-07-21 · 1 分钟 · wen