MIT 6.824 Lab2 实验报告: Raft

摘要

本篇是 MIT 6.824 Lab2 的实验报告,用 Golang 实现 Raft 强一致性协议,包含 leader 选举、日志同步、持久化、日志快重传、日志压缩几部分内容。

遵循课程规定,本文没有放出核心代码,只介绍了一些结构设计和流程思考,可放心食用。

理论知识

Raft 简介

Raft 是一种基于过半票决(Majority Vote)的分布式共识算法。过半票决是指,对于某个决定(例如选举谁做 Leader),只有当超过半数以上的节点都投票同意,才会生效。这里的半数是指所有节点中的半数(包括已经宕机、下线的节点)。一般设置服务器数量为奇数,因为偶数多的一台服务器是没有意义的。通常来说,2n+1 个节点可以最多容忍 n 个节点下线。

Raft 中,存在三种角色:Leader(至多一个)、Candidate、Follower。Leader 会接收应用请求,产生日志,将日志同步给 Follwer,当 Leader 发现超过半数的节点都正确接收了日志,就可以想应用层提交日志。

Raft 中,Leader 是以任期(term)为单位存在的。每个任期中至多只会有一个 leader,Leader 会通过心跳与 Follower 保持沟通。初始时,所有节点均为 Follower,每个 Follower 有一个选举计时器,当收到 leader 信息后会将其重置。若 follower 的选举计时器超时,则会将任期 + 1,开始一次 leader 选举,将自身的日志更新情况和任期等信息发送给其他节点,由它们投出赞成 / 反对票。若某个 follower 获取了超半数的赞成票,则它会成为 leader。

过半票决

过半票决可以避免脑裂。即使网络存在分区,也必然不可能有超过一个分区拥有过半数量的服务器,进而可以保证 leader 只有一个。这里背后更微妙的点在于,如果你总是需要过半的服务器才能完成任何操作,同时你有一系列的操作需要完成,其中的每一个操作都需要过半的服务器来批准,例如选举 Raft 的 Leader,那么每一个操作对应的过半服务器,必然至少包含一个服务器存在于上一个操作的过半服务器中。也就是说,任意两组过半服务器,至少有一个服务器是重叠的。实际上,相比其他特性,Raft 更依赖这个特性来避免脑裂。

这里可以用反证法思考。如果两组过半服务器,不重叠,交集为空,所以它们的并集节点数 > N,即不重复的节点数量大于 N,就有矛盾了

例如,当一个 Raft Leader 竞选成功,那么这个 Leader 必然凑够了过半服务器的选票,而这组过半服务器中,必然与旧 Leader 的过半服务器有重叠。所以,新的 Leader 必然知道旧 Leader 使用的任期号(term number),因为新 Leader 的过半服务器必然与旧 Leader 的过半服务器有重叠,而旧 Leader 的过半服务器中的每一个必然都知道旧 Leader 的任期号。类似的,任何旧 Leader 提交的操作,必然存在于过半的 Raft 服务器中,而任何新 Leader 的过半服务器中,必然有至少一个服务器包含了旧 Leader 的所有操作。这是 Raft 能正确运行的一个重要因素。

日志

在 Raft 中,每个应用请求都会关联创建一条日志(Log)。为什么要有日志?有以下几个原因:

  • 日志是 Leader 用来对操作排序的一种手段,这使得应用层的操作可以以某种正确的逻辑串行执行
  • 日志用来存放临时操作。Follower 收到了这些临时的操作,但是还不确定这些操作是否被 commit 了,只有当超过半数以上节点都收到,才会 commit,否则会丢弃
  • 日志可以用来持久化。Leader 需要在它的日志中记录操作,因为这些操作可能需要重传给 Follower,用于恢复状态

日志压缩

在 Raft 中,Log 压缩和快照解决的问题是:对于一个长期运行的系统,例如运行了几周,几个月甚至几年, Log 会持续增长。最后可能会有数百万条 Log,从而需要大量的内存来存储。如果持久化存储在磁盘上,最终会消耗磁盘的大量空间。如果一个服务器重启了,它需要通过重新从头开始执行这数百万条 Log 来重建自己的状态。当故障重启之后,遍历并执行整个 Log 的内容可能要花费几个小时来完成。这在某种程度上来说是浪费,因为在重启之前,服务器已经有了一定的应用程序状态。

所以,当 Raft 认为它的 Log 将会过于庞大,例如大于 1MB,10MB 或者任意的限制,Raft 会要求应用程序在 Log 的特定位置,对其状态做一个快照。所以,如果 Raft 要求应用程序做一个快照,Raft 会从 Log 中选取一个与快照对应的点,然后要求应用程序在那个点的位置做一个快照,并丢弃这个点之前的日志。

整体设计

本人的代码经过了一百多次测试,没有任何问题,下面我先介绍一下我的整体设计,供读者参考。

首先介绍一下 Raft 结构体。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
type Raft struct {
sync.Mutex // Lock to protect shared access to this peer's state
peers []*labrpc.ClientEnd // RPC end points of all peers
persister *Persister // Object to hold this peer's persisted state
me int // this peer's index into peers[]
dead int32 // set by Kill()
// common properties
applyCh chan ApplyMsg

// leader election
role int8
lastLeaderBeatTime time.Time
electionTimeout time.Duration
electionCond *sync.Cond
// Persistent state
currentTerm int // 当前 Term ID(初值为 0)
votedFor int // 该 Term 中已接收到来自该节点的选票的 Candidate ID
logs []LogEntry // 日志记录。第一个日志记录的 index 值为 1
lastIncludedIndex int // 日志的快照偏移,初始为0,也相当于已经在快照中的最大日志索引,可以用来判断日志进度
lastIncludedTerm int // lastIncludedIndex 对应的term

// all-server volatile states
commitIndex int // 最后一个已提交日志记录的 index(初值为 0)
lastApplied int // 最后一个已应用至上层状态机的日志记录的 index(初值为 0)
commitCond *sync.Cond
// leader volatile states
nextIndex []int // 每个节点即将为其发送的下一个日志记录的 index(初值均为 Leader 最新日志记录 index 值 + 1)
matchIndex []int // 每个节点上已备份的最后一条日志记录的 index(初值均为 0)
// service persistence
snapshot []byte
}

里面的内容是随着每次 lab 填充的,逐步增加功能即可。宏观上看,结构体内需要存储以下信息:

  • leader 选举的信息:节点的角色、选举计时器、超时时间等
  • 需要持久化的重要信息:任期、投票对象、日志、快照等
  • 提交信息:记录提交日志索引、上传应用层日志索引
  • leader 记录 follower 的信息:记录每个节点的下一个发送的日志索引、匹配的日志索引

goroutine

除了处理 RPC 的线程外,一个服务器节点要常驻几个后台线程呢?在我的设计里,是三个:

  • ticker:维护选举定时器,超时后发起选举
  • leaderHeartBeat:若当前节点是 leader,定期心跳、同步日志
  • asyncCommit:当 commitIndex>lastApplied 时,向应用层逐个提交日志

下面,我将逐个介绍。

ticker

ticker 单独启动一个线程应该是毋庸置疑的,问题在于如何设计选举定时器呢。在我的设计里,选举定时器由两部分组成:

  • 起始时间:从什么时候开始计时,对应 lastLeaderBeatTime
  • 超时时间:超过多久判定超时,对应 electionTimeout

ticker 会检查当前时间是否超过 lastLeaderBeatTime+electionTimeout,若是,则起一个异步线程开始选举。

leaderHeartBeat

leader 用来维护心跳和同步日志的线程。如果要做的精细一点,可以通过 sync.Cond 等到当前节点成为 leader 后再唤醒,或者粗暴一点直接 sleep 重试就好了。

asyncCommit

这个是有必要讲一下的。为什么要异步提交?因为我在做 2D 快照实验的时候发现了一个死锁的问题。当时我使用的是同步提交,逻辑示例如下:

1
2
3
4
5
6
7
8
9
func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesReply) {
rf.Lock()
defer rf.Unlock()
// ...
// for log in toCommit
// do
// rf.applyCh <- log
// done
}

为了保证日志的串行提交,整个提交过程中是加锁同步的。在 2D 实验里,创建快照的过程示例如下:

1
2
3
4
5
6
7
8
func (rf *Raft) Snapshot(index int, snapshot []byte) {
rf.Lock()
defer rf.Unlock()
// ...
// truncate rf.logs
rf.lastIncludedIndex = index
// ...
}

这个过程中涉及到读写共享变量,也是加锁的,没有问题。

如果这么写的话,问题就来了,在 2D 里就会发现一个节点日志提交的好好的,突然就没信了,到最后只剩下一个节点在打印日志。加一些调试日志后会发现,出现了死锁,问题的根源在于测试的这一段代码(精简了部分逻辑):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (cfg *config) applierSnap(i int, applyCh chan ApplyMsg) {
cfg.mu.Lock()
rf := cfg.rafts[i]
cfg.mu.Unlock()

for m := range applyCh {
if m.SnapshotValid {
cfg.mu.Lock()
err_msg = cfg.ingestSnap(i, m.Snapshot, m.SnapshotIndex)
cfg.mu.Unlock()
} else if m.CommandValid {
if m.CommandIndex != cfg.lastApplied[i]+1 {
err_msg = fmt.Sprintf("server %v apply out of order, expected index %v, got %v", i, cfg.lastApplied[i]+1, m.CommandIndex)
}
if (m.CommandIndex+1)%SnapShotInterval == 0 { // 看这里!!!!
w := new(bytes.Buffer)
// ...
rf.Snapshot(m.CommandIndex, w.Bytes())
}
}

}
}

这里的 applyCh 即是应用层的 channel,raft 启动节点时,会启动一个线程执行这个函数,读取日志消息,根据类型进行处理。那么好,问题就来了。观察注释处,可以发现,生成快照和接收应用层消息,使用的是同一个 goroutine。当同步提交的过程中,如果触发了创建快照,就会停止接收 raft 日志,进而阻塞住同步提交

不幸的是,同步提交线程持有了 raft 的锁,这使得创建快照的线程一直阻塞在这个锁上,进而陷入死锁

粗暴一点解决,提交日志这个过程可以异步化。每次要提交时,先加锁读取状态,创建一个 goroutine 在后台提交,逻辑示意如下:

1
2
3
4
5
6
7
8
9
10
11
func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesReply) {
rf.Lock()
defer rf.Unlock()
// ...
go func(){
// for log in toCommit
// do
// rf.applyCh <- log
// done
}()
}

看起来好像解决了问题,执行可能也不会出现问题,但是这段代码是有问题的。问题在于要多次提交时,无法严格保证顺序。如果有两个密集的 AppendEntries 到来,都要求提交日志,最后应用层收到的日志可能是交替的结果。在实验中,有要求心跳不能过于频繁,因此可能不会暴露出问题。

我认为正确的解决方法,是像我一样,将日志的 raft 提交和应用层提交区分开。AppendEntries 只改 commitIndex,后台线程发现 commitIndex>lastApplied,去向应用层提交。由于提交到应用层这一过程,只在单线程里发生,可以严格保证顺序。

此外,这种方案还有一个优点,就是避免应用层接收故障影响 raft 层。对于 leader 来说,如果它也是同步提交到应用层,那么如果应用层的接受出现问题,就会阻塞 leaderHeartbeat 线程,使得 leader 无法服务。这也是有问题。改为异步后台线程后,raft 层就不会受到影响。

2A leader 选举

这个实验要求实现 leader 的选举相关逻辑,以及 leader 的心跳功能(抑制选举)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type RequestVoteArgs struct {
// Your data here (2A, 2B).
Term int // Candidate 的 Term ID
CandidateId int // Candidate 的 ID
LastLogIndex int // Candidate 所持有的最后一条日志记录的 index
LastLogTerm int // Candidate 所持有的最后一条日志记录的 Term ID
}

// example RequestVote RPC reply structure.
// field names must start with capital letters!
type RequestVoteReply struct {
// Your data here (2A).
Term int // 接收方的 Term ID
VoteGranted bool // 接收方是否同意给出选票
}

处理流程如 raft 论文里描述的,

接收方在接收到该 RPC 后会进行以下操作:

  1. term < currentTerm,返回 false
  2. votedFor == null 且给定的日志记录信息可得出对方的日志和自己的相同甚至更新,返回 true

注意,这里的日志和自己的相同甚至更新,是通过最后一条日志的任期和索引判断的,也就是 LastLogTermLastLogIndex。逻辑是,先判断任期大的更新,任期相同,则日志索引大的更新。

还需要注意的是,下面这条规则是通用的,优先级最高的。

对于所有节点:

  • 若 RPC 请求或相应内容中携带的 term > currentTerm,则令 currentTerm = term,且 Leader 降级为 Follower

更详细一点描述,对于请求的 term

  • 如果 term > currentTerm,说明自身的任期落后,需要追上令 currentTerm = term,且 Leader 降级为 Follower,且将 voteFor 置空。因为在任期改变后,投票、角色信息都要清空
  • 如果 term<currentTerm,说明请求是过时的节点发出的,不予处理,并将自己的 currentTerm 写在响应中,让对方意识到自己落后了

建议抽为一个函数,集中处理这块的逻辑。

2A 整体还比较简单,没有太大的坑。

2B 日志

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
type AppendEntriesRequest struct {
Term int //Leader 的 Term ID
LeaderId int // Leader 的 ID
PrevLogIndex int // 在正在备份的日志记录之前的日志记录的 index 值
PrevLogTerm int // 在正在备份的日志记录之前的日志记录的 Term ID
Entries []LogEntry // 正在备份的日志记录
LeaderCommit int // Leader 已经提交的最后一条日志记录的 index 值
}

type AppendEntriesReply struct {
Term int // 接收方的当前 Term ID
Success bool // 当 Follower 能够在自己的日志中找到 index 值和 Term ID 与 prevLogIndex 和 prevLogTerm 相同的记录时为 true
XTerm int // term in the conflicting entry (if any)
XIndex int // index of first entry with that term (if any)
XLen int // log length
}

处理流程如 raft 论文里描述的:

接收方在接收到该 RPC 后会进行以下操作:

  1. term < currentTerm,返回 false
  2. 若日志中不包含 index 值和 Term ID 与 prevLogIndexprevLogTerm 相同的记录,返回 false
  3. 如果日志中存在与正在备份的日志记录相冲突的记录(有相同的 index 值但 Term ID 不同),删除该记录以及之后的所有记录
  4. 在保存的日志后追加新的日志记录
  5. leaderCommit > commitIndex,令 commitIndex 等于 leaderCommit 和最后一个新日志记录的 index 值之间的最小值

这里有一个坑是加粗的地方,只有在有冲突的情况下,才会对 follower 的节点进行截断处理。这里的冲突时指有相同的 index 值但 Term ID 不同,而不是 command 的值不同,注意,任何时候不能用 command 的值去做判断逻辑。Raft 本身对应用层应该是一无所知的。

最开始的话,容易想当然,如果前一个节点是匹配的,我直接把当前节点后面的都丢掉,全盘接收 leader 传过来的日志不就行了,为什么还要去判断冲突呢?原因是,follower 可能收到过期的 AppendEntries 请求,直接截断可能会导致丢弃掉部分日志,例如 follower 已经告诉 leader 自己成功接收的,就会出现问题。

这里还可能出现的问题是,data race。如果开了 -race flag,就会发现有时候会在 AppendEntries 处理过程中报 race,明明已经加了锁了。代码示例可能如下:

1
2
3
4
5
6
func (rf *Raft) AppendEntries(args *AppendEntriesRequest, reply *AppendEntriesReply) {
rf.Lock()
defer rf.Unlock()
// ...
rf.logs=append(rf.logs[:x], args.Entries...) // race
}

这是由于,在 go 中,切片的截断操作是复用底层数据的。Entries、logs 的切片截断,都会间接持有 logs 的引用,进而在某些区域出现读写 race 问题。解决方法是把 log 复制到别的地方,再做截断。

还有一个点,论文里的描述会晦涩一点:

对于 Leader:

如果存在一个值 N,使得 N > commitIndex,且大多数的 matchIndex[i] >= N,且 log[N].term == currentTerm,令 commitIndex = N

这个说的是,如果 leader 发现,超半数以上的节点都接收到了 N 索引以上的日志,且 N 日志是在当前任期产生的,那么就对 N 之前的日志进行提交。具体实现上,可以将 matchIndex 排序,取中值即可。

2C 持久化

正如论文里描述的,持久化的信息有三种

1
2
3
currentTerm       int        // 当前 Term ID(初值为 0)
votedFor int // 该 Term 中已接收到来自该节点的选票的 Candidate ID
logs []LogEntry // 日志记录。第一个日志记录的 index 值为 1

这个比较明确,接下来需要思考的是,什么时候持久化。

思考后不难想出,在这些变量发生变化之后,都需要持久化。其实再进一步思考,可以发现,raft 节点的变化,只有被其他节点感知到,才是有效的。换而言之,如果一个 raft 节点内部一直在变化状态,但是由于网络问题,这些状态无法传给其他节点,那么也无需保存了。再进一步,raft 间感知状态只有通过 rpc 调用这一种方式,因此可以在处理 rpc 的逻辑之后,进行持久化。

容易疏忽的是,在 leader 提交日志后,也需要持久化,这里相当于其他节点的 AppendEntries 了,需要对等处理。

日志的快恢复算法可以参考老师上课的讲义,描述的比较清楚了已经。

2D 日志快照

日志快照本身的思想很简单,但是实现逻辑却相当复杂。首先要思考一个问题,快照的 lastIncludedTerm/lastIncludedIndex 需要持久化嘛?

答案是要的,考虑特殊情况,如果一个节点内的所有日志都存到了快照里,本地一个日志都没有了。这个时候,来了一个投票请求,这个节点要怎么决定是否同意呢?也可以反过来,如果这个节点要开启一轮选举,怎么得到它的日志进度呢

因此,持久化这些信息是有必要的。这个实验最复杂的逻辑是:日志索引和 logs 位置的映射,日志找不到怎么处理。这一部分改起来会比较痛苦,如果前面没有预留一些设计空间的话,就需要重构。剩下还有一些历史逻辑,比如和持久化、快恢复的搭配,也需要思考清楚。

一些建议

下面是我个人对于这个 lab 的一些建议,实在一点:

  1. 绝对、绝对不要持有锁做任何可能阻塞的事情,包括不限于 RPC、channel 操作等。死锁往往就是这么发生的。
  2. 将需要加锁和不需要加锁的函数分开命名,例如 getMaxLogIndexWithLockgetMaxLogIndexWithoutLock。这是由于 go 的锁不支持重入,反复加锁也会死锁。分开命名可以提醒自己在不同的情况,调不同的函数。
  3. 最小化加锁逻辑。加锁仅限于共享变量的读写,读写完毕后即可释放。Raft 的正确性可以通过任期保证过期数据不会产生恶劣影响,无需担心。
  4. 多用 DPrintf,只有日志是你唯一的依靠(,建议用 nohup 把日志放在文件里,方便分析

最后做完这个 lab,能百余次通过测试,还是很有成就感的。希望本篇可以帮到大家!

参考