这个实验应该是5个实验中最简单的了,要求是让我们实现一个简单的有容错机制的KV服务器。
我们要完成的代码文件有三个:
client.go: 给服务端发送信息,需要实现的有put,append,get三个函数,这三个函数都会对服务端发起请求,将对应的键值对存入服务端。
server.go: 接受client.go的存取服务,最终达到的效果是client在进行存取键值对的时候不会出错,就像在本地存取一样。
common.go: 负责封装client和server之间的通信的数据结构
线性一致性:在并发操作下,一个共享数据存储(如分布式数据库、键值存储)应该使得系统看起来就像只有一个数据副本,并且所有操作都是原子性地、即时地发生。
在本次实验中,因为服务端只有一个,所以加锁就可以实现线性一致性。
由于在不发生错误的情况下进行的键值对操作非常简单,这里不再多说。
三种操作#
我们需要在传输过程中可能发生错误的情况下完成以下三种操作。
Get(key): 接受key,返回对应的value值。这个函数是最好实现的,因为Get() 是幂等的(只读取状态,不修改),因此即使请求或回复丢失,重试即可。
Put(key, value): put添加数据,若key不存在则添加key和对应的value值,若key存在则覆盖。这个操作涉及到了对状态的修改,由于网络层的不确定性,会导致消息丢失(包括请求被丢弃,回复被丢弃)和重试。因此需要针对可能出现的问题进行额外的操作。
Append(key, arg):将arg追加到key对应的value中,同时返回旧的value。关于容错部分同Put()。
Put()和Append()我们通过为请求添加ID和在操作成功后发送Report请求这两种方式来实现容错。
- 为请求添加编号:可以使得
server可以知道多次的Put()操作是不是同一个,以避免多次放入相同的数据。但由于需要存储ID相关的信息,如果不及时释放掉,每一层新的操作都会在其中增加一条记录,这可能会导致内存爆掉。因此需要Report操作。
Report请求:在我们server成功完成client的请求并成功将请求传达给client后,client会像server发送Report请求,告诉server,刚刚那个操作涉及到的衍生数据可以删掉了。
具体实现#
common.go#
因为Get()和Put()及Append()不同,所以我们的通信时所携带的信息也是不同的。Get()由于幂等,不需要过多的信息:
1
2
3
4
5
6
7
|
type GetArgs struct {
Key string
}
type GetReply struct {
Value string
}
|
PutAppend()的通信相关数据结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
type RequestType byte
const (
Request RequestType = iota
Report
)
// Put or Append
type PutAppendArgs struct {
Key string
Value string
ArgId int64 // ID
ArgType RequestType // 请求类型
}
type PutAppendReply struct {
Value string
}
|
client.go#
client.go的实现非常的简单,对于Get(),只需要重复请求直到顺利成功即可,具体代码如下:
1
2
3
4
5
6
7
8
9
|
func (ck *Clerk) Get(key string) string {
args := GetArgs{key}
reply := GetReply{}
for !ck.server.Call("KVServer.Get", &args, &reply) {
}
return reply.Value
}
|
对于PutAppend(),我们为每一次操作添加一个独一无二的ID,之后也循环请求。直到成功后,再向Server端发送Report请求,申请将衍生数据删掉。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
func (ck *Clerk) PutAppend(key string, value string, op string) string {
argID := nrand() // 代码框架中帮我实现好了的一个随机函数,可以用这个函数获得ID
args := PutAppendArgs{Key: key, Value: value, ArgId: argID, ArgType: Request}
reply := PutAppendReply{}
for !ck.server.Call("KVServer."+op, &args, &reply) {
}
res := reply.Value
args.ArgType = Report
for !ck.server.Call("KVServer."+op, &args, &reply) {
}
return res
}
func (ck *Clerk) Put(key string, value string) {
ck.PutAppend(key, value, "Put")
}
// Append value to key's value and return that value
func (ck *Clerk) Append(key string, value string) string {
return ck.PutAppend(key, value, "Append")
}
|
Server.go#
我们需要维护的数据结构如下:
1
2
3
4
5
6
|
type KVServer struct {
mu sync.Mutex
data map[string]string // 存储客户端发来的KV
Requests map[int64]string // 存储每一个操作的ID
}
|
对于Get()操作,我们读取返回即可。
1
2
3
4
5
6
7
8
9
|
func (kv *KVServer) Get(args *GetArgs, reply *GetReply) {
key := args.Key
kv.mu.Lock()
defer kv.mu.Unlock()
value := kv.data[key]
reply.Value = value
}
|
对于Put(),我们需要检查一下该次请求类型:
- 若为
Request:则检查该次操作的ID是否存在,若存在,则证明已经操作过了,直接返回即可,若不存在,则加入ID,同时加入客户端发来的数据。
- 若为
Report:则删除Requests中对应的数据即可。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
|
func (kv *KVServer) Put(args *PutAppendArgs, reply *PutAppendReply) {
key := args.Key
value := args.Value
ID := args.ArgId
argType := args.ArgType
kv.mu.Lock()
defer kv.mu.Unlock()
if argType == Request {
_, exist := kv.Requests[ID]
if exist {
return
}
kv.data[key] = value
kv.Requests[ID] = value
} else {
delete(kv.Requests, ID)
}
}
|
Append()操作同Put()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
|
func (kv *KVServer) Append(args *PutAppendArgs, reply *PutAppendReply) {
// Your code here.
key := args.Key
value := args.Value
ID := args.ArgId
argType := args.ArgType
kv.mu.Lock()
defer kv.mu.Unlock()
if argType == Request {
oldValue, exist := kv.Requests[ID]
if exist {
reply.Value = oldValue
return
}
oldValue = kv.data[key]
kv.data[key] = oldValue + value
reply.Value = oldValue
kv.Requests[ID] = oldValue
} else if args.ArgType == Report {
delete(kv.Requests, ID)
}
}
|
lab1和lab2其实一个多月前就写完了,但那会没打算写博客。lab3由于实验和论文到现在都没动,今天论文实在写不下去了,想起来lab2的内容还没写,就写一下把。希望放假前能把论文投出去,后面就有时间把后三个lab做掉了。