MIT-6.824 Lab3: 基于 Raft 的 KV 数据库

摘要

本篇是 MIT 6.824 的 lab3,基于之前构建的 Raft 层的容错、线性一致性的能力,实现一个 KV 数据库。

遵循课程规定,本文没有放出核心代码,只介绍了一些结构设计和流程思考,可放心食用。

整体架构

KV 数据库是典型的客户端 - 服务端架构,结构图如下(来自官网):

  • 客户端:GET/PUT/APPEND 请求的发起者,期望得到一个线性一致的结果
  • 服务端:基于 Raft 的分布式存储,屏蔽了内部细节,对外表现的像是一个可靠的单机存储

这里的线性一致是指,多个客户端发起请求时,某个客户端得到的结果,对其他客户端是立刻可见的。比如客户端 A 发起了 put x 1,只要它得到了正确处理的结果,任何其他客户端发起 get x 得到的一定是 1。这是通俗一点的解释方法。来自维基百科的,更学术一点的定义:

In concurrent programming, an operation (or set of operations) is linearizable if it consists of an ordered list of invocation and response events, that may be extended by adding response events such that:

  1. The extended list can be re-expressed as a sequential history (is serializable).
  2. That sequential history is a subset of the original unextended list.

也可以参考分布式 - 共识、线性一致性与顺序一致性 - 叽叽喳喳 - SegmentFault 思否

二者之前的通信基于 RPC:

  • 客户端 API:GET/PUT/APPEND 方法,反复重试直到找到对应的服务端 leader 节点,发起请求
  • 服务端逻辑:如果不是 leader,返回报错,否则阻塞到请求成功执行,返回执行结果

需要考虑的问题

重复请求的幂等性:由于网络不稳定,可能会有应用层的重发,客户端需要给请求带唯一 id,服务端需要做幂等处理

  • 注意,这个要和 TCP 的 “不重不漏” 区分开。假设传输层使用 TCP 协议,也只能保证应用层的报文被正常接收(应答方返回 ACK),并不能对应用层的逻辑作任何保证,必须得收到应用层的响应才能认为请求得到处理,收不到就需要重发。

客户端

客户端的逻辑比较清晰,需要缓存一个 leaderId 记录上次成功处理请求的 server,避免每次都要轮询所有 server。为了支持幂等性,需要有客户端 id、序列号 id 两个字段,唯一标识一个客户端请求。对应地、服务端需要保存每个客户端处理过的最大请求 id,在处理前先确认没有处理过,进而实现幂等性。

在这种场景下,要求每个客户端只能以单线程地阻塞发起请求,才可以保证幂等性。反之,如果客户端 0 同时发起了 1、2 两个请求,2 被正确处理了,1 丢失没有到达 server。等 1 的重传报文到达 server,server 发现处理完的最大请求 id 是 2>1,进而拒绝处理,就出问题了。

1
2
3
4
5
6
7
type Clerk struct {
servers []*labrpc.ClientEnd
// You will have to modify this struct.
leaderId int // leader id (client 的角度)
id int64 // clerk uuid
seqId int64 // 从 1 开始自增的请求Id
}

这里需要注意的一点是,客户端和服务端的角度来看servers 的顺序是不一致的。我最开始想做的一个优化是,server 在收到请求,发现自己不是 leader 之后,把正确 server 的 id 发给客户端,客户端直接向对应的 id 发请求,不必轮询所有 server,实际运行的时候却出现了问题。观察代码可以发现,测试中新建客户端时,做了乱序处理:

1
ck := MakeClerk(random_handles(ends))

这样的设计进一步解耦了 client 和 server,在真实系统里,client 和 server 的访问策略和机制可能是不一致的,两者间不应该产生依赖。

这里有个坑点是,key 不存在的时候,client 应该返回空字符串,我最开始返回了 ErrNoKey,报了奇怪的线性不一致问题,对着 procupine 的图反复研究,有点乐。

服务端

结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type KVServer struct {
deadlock.Mutex
me int
rf *raft.Raft
applyCh chan raft.ApplyMsg
dead int32 // set by Kill()

maxraftstate int // snapshot if log grows this big

// Your definitions here.
// kv
maxRaftSize int // Raft 最大状态
db map[string]string // 数据存储
seen map[int64]int64 // client对应的max Seq
// 维护提交状态
lastApplied int // 最后提交的 log id
applyCond *deadlock.Cond // apply 条件
// persistence
persister *raft.Persister
}

服务端的逻辑如下,当接收到一个 PUT/GET/APPEND 请求:

  1. 判断自己是否是 leader,若否,返回错误
  2. 将请求提交给 raft
  3. 等待 raft 将请求提交至应用层,执行完毕 / raft 的 leader 情况发生变化
  4. 返回结果

处理 raft leader 变化

需要注意的是,在等待 raft 提交请求的过程中,可能会 leader 情况发生变化,这种情况要返回错误。处理不当可能会有问题。例如,我最初是这样等待请求处理完毕的:

1
2
3
4
5
index, _, isLeader := kv.rf.Start(op)
for isLeader && kv.lastApplied < index {
kv.applyCond.Wait()
}
// ...

这样没有处理 leader 变化的情况,指导书里提到可以这样处理,当节点检查到一个不同的请求已经出现在 Start 返回的 index 位置,即比较两个的请求内容是否相同,若不同,则说明其他节点成为了 leader,同步了不一样的日志过来。

我觉得这种做法可以改进,这种做法需要等待:

  1. 当前节点收到更大的 term,变为 follower
  2. 当前节点收到 leader 的 index 日志
  3. 当前节点提交 index 日志至应用层

之后,才会意识到自己不是 leader 了。但实际上,早在第一步就可以发现错误了。可以当 Raft 的 leader 状态发生变化时,提交一条特殊的日志到应用层,触发检查,即可及早返回错误。

幂等性

当 server 处理请求之前,需要先判断 seqId 是否大于已处理同一 client 的最大 seqId:

  • 若是,则处理请求,更新最大 id
  • 若否,丢弃请求

快照

在 lab 2 中,raft 层实现了快照功能,本次需要在应用层对 raft 的状态进行判断,即将到达要求时,在应用层生成快照,并提交给 raft 处理。需要考虑是快照里要包含哪些信息:

  • 数据库的键值对,这个是毋庸置疑的
  • 每个 client 已处理的最大请求 Id,避免重启后处理重复消息

一些问题

Operations completed too slowly

lab3 测试中可能会出现这个错,一般原因是,在 Start 中没有立刻同步日志,只有被动地依赖 Raft 心跳进行传输,使得操作处理速率受限于心跳频率。解决方法当然是,在 Start 后立刻异步同步日志。

细心的你会发现,Start 和心跳可能会同时同步日志,那么会不会有什么问题呢?当然是有的!问题表现的像是收到了乱序的网络回复。我们知道,AppendEntries 调用可能会有以下两种结果:

  • 失败,leader 降低 nextIndex 重试,可能发起 InstallSnapshot
  • 成功,leader 调高 nextIndex, matchIndex

假设出现了这样的执行顺序:

  1. 心跳线程,发起 AppendEntries
  2. Start 发起 AppendEntries
  3. 心跳线程,失败,降低 nextIndex
  4. 心跳线程,InstallSnapshot 成功,增大 nextIndex > lastIncludedIndex
  5. Start 收到结果,失败,降低 nextIndex
  6. 心跳线程,InstallSnapshot 成功后尝试同步快照后的日志,期望 nextIndex > lastIncludedIndex发现不符合,出错

当然,这个问题跟具体实现有关,我的实现中,失败后会依次完成 InstallSnapshot,成功后继续 AppendEntries,由于发起 RPC 过程中释放了锁,导致其他线程修改了 nextIndex,进而出错。

解决方法是,进行兼容处理,所有的,由于 RPC 中途释放的过程,形如:

1
2
3
4
5
6
7
8
rf.Lock()
v:=rf.nextIndex
rf.Unlock()
server.Call()
rf.Lock()
// should use v rather than rf.nextIndex
// should check v==rf.nextIndex
rf.Unlock()

在第一次加锁中,需要读取共享变量的值,第二次加锁时,需要选择:

  • 使用第一次读到的值,不要读新值,更新时要兼容更新(例如取 max)
  • 检查新值旧值是否相同,再决定要不要进行剩下的逻辑

具体选哪种要看操作类型,比如我上面举的例子,当应该判断新的 nextIndex 是否合法,不合法就退出执行。

此外,在接收端,即处理 AppendEntriesInstallSnapshot 时,也需要做乱序请求的兼容,不过由于处理时基本一直加锁,只需在处理前判断即可。

心跳可以不传数据吗

我不建议这样做。Start 只有在应用层新增请求的时候会使用。如果心跳不传数据,可能会导致日志同步过慢。例如 leader 收到新日志 1 后,还没同步到过半节点,宕机了。新 leader 上线后,由于应用层迟迟没有新请求到来,日志 1 迟迟得不到同步,也不好。如果心跳也可以同步日志,就可以保证了一个最低同步时延。

Start 怎么同步日志

我建议不要每次都起 goroutine,而是为每个 peer 维护一个后台同步线程,Start 只是唤醒这个线程执行。虽然 goroutine 创建和销毁的开销不大,但是,频繁 Start 场景下,每次 Start 都会开启 n 个 goroutine,这:

  • 不必要:如果前一个日志同步没有得到响应(可能节点下线),发起新的同步意义不大
  • 存在重复传输:前一个日志同步没有得到响应,每次新的同步都会带有 nextIndex 之后的全量日志,有很多是重复的
  • 存在锁的争用和潜在的并发问题

总结

本次 lab 整体不难,就是可能 Raft 有些问题,需要再返回去排查改错。如果 Raft 模块设计的不好,再回过头改动会很痛苦。