MIT 6.824 Lab1 实验报告: MapReduce

摘要

本篇是 MIT 6.824 Lab1 的实验报告。MIT 6.824 是一门分布式系统的课程 ,最近打算系统地学习一下,我把课程资源放在了博客末尾,感兴趣的同学也可以一起来学。Lab 1 中,要求用 Golang 实现一个 MapReduce 的框架。

遵循课程规定,本文没有放出核心代码,只介绍了一些结构设计和流程思考,可放心食用。

实验准备

拉取代码

在实验指导书中 6.5840 Lab 1: MapReduce (mit.edu),包含了代码库的地址和一些简单介绍。

1
2
3
4
5
$ git clone git://g.csail.mit.edu/6.5840-golabs-2023 6.5840
$ cd 6.5840
$ ls
Makefile src
$

代码骨架介绍

src 为当前目录,Lab 1 里需要关注的是以下几个部分:

  • mr/:MapReduce 的核心逻辑,包含协调器(Coordinator)、Worker、RPC 三部分,需要自己实现。
  • mrapps/:MapReduce 的应用,用于测试,例如 wc.go 就是单词计数(WordCount)
  • main/mrsequential.go:MapReduce 的串行启动版本,可以参考一些文件读写的代码
  • main/mrcoordinator.go, main/mrworker.go:coordinator、worker 的启动类

理论知识

MapReduce 框架中,包含两种角色和两个阶段(两种任务)。

两个阶段:

  • Map 阶段:对分块文件运行 Map 函数
    • 函数签名:func Map(filename string, contents string) []mr.KeyValue
    • 接收文件名、文件内容为参数,返回 [key,value] 的列表
  • Reduce 阶段:对同一个 KEY 的 VALUE 聚合结果进行规约计算
    • 函数签名:func Reduce(key string, values []string) string
    • key 为 Map 阶段产生的某个 key,values 为 Map 阶段该 key 对应的所有 value 的列表
  • Reduce 阶段必须在所有 Map 任务都完成之后才能开始,否则会丢失数据

两种角色是 Coordinator 和 Worker:

  • Coordinator:负责协调任务的执行,将 Map/Reduce 任务下发给 Worker,监听 Worker 状态,如果 Worker 宕机对相关任务进行重新下发
  • Worker:负责任务的执行,以心跳与 Coordinator 周期交互

整体的流程如下:

  1. Coordinator 得到输入文件的切片
  2. Coordinator 向 Worker 发送 Map 任务
  3. Worker 上报 Map 任务执行完成,中间结果存储在 Intermediate Files 中
  4. 所有 Map 任务执行完毕后,Coordinator 对同一个 key 的中间结果进行聚合,下发 Reduce 任务
  5. Worker 上报 Reduce 任务执行完成,结果写到输出文件中
  6. 所有 Reduce 任务执行完毕,退出

需要注意的是,Worker 可能由于种种原因宕机,所以需要做容错处理。Coordinator 与 Worker 保持心跳,在发现心跳超时一定阈值后,需要把 Worker 标记为已下线,重新下发任务。

另外,考虑 Worker 宕机的时机,如果 Worker 在写入 Reduce 的输出结果时宕机,写了一半的文件可能会产生误解,指导书里的建议是先把结果写到临时文件中,等到运行结束再重命名。

RPC

首先需要思考的是,Coordinator 和 Worker 间的通信有几种。目前的架构下,只能由 Worker 主动向 Coordinator 发起请求,获得回应,消息种类可能是:

  • 请求一个任务
  • 上报心跳
  • 上报任务结果

需要在 RPC 文件里定义好这些请求的 request 和 reply,例如,任务请求的例子如下:

1
2
3
4
5
6
7
8
9
10
type AcquireTaskRequest struct {
WorkerCode string // worker id
}

type AcquireTaskReply struct {
Mode int8 // 任务模式
Files []string // 输入
NReduce int // reduce桶数量
TaskId string // 任务Id
}

设计上,worker 侧最好带有一个唯一的 code,方便追踪和排错,任务也是。此外,任务还需要指定一个执行模式,map/reduce/ 其他。由于 go 没有枚举类型,这里用一个 int8 表示。稍加设计,可以发现任务类型可分为四种:

  • map
  • reduce
  • wait:要求 worker 等待一段时间,后续可能会有任务
  • done;要求 worker 退出,后续不可能有任务了

类似地,可以定义心跳、任务结果上报的请求和响应,这里就省去了。考虑到上报结果和请求任务总是成对出现的,设计上也可以进行合并。

Coordinator

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
type Task struct {
Id string // 任务Id
Mode int8 // 运行模式
Files []string // 输入文件
WorkerCode string // 运行Worker的标识
LastBeatTime time.Time // 最后一次心跳时间
}

type Coordinator struct {
sync.Mutex // 互斥锁
Files []string // 分片文件
NReduce int // reduce 桶数
RunningTaskMap map[string]*Task // 运行中的任务
PendingTasks []*Task // 待运行的任务
MapDone bool // Map阶段是否结束
ReduceDone bool // Reduce阶段是否结束
ReduceFiles map[int][]string // Reduce桶的中间文件
LastWorkerBeatTime time.Time // 最后一次Worker心跳时间
}

Coordinator 包含两种 goroutine,rpc 处理的 goroutine、主 goroutine,需要加锁避免竞态条件、保证可见性。

Coordinator 的流程如下:

  • 初始化 Map 任务
  • 当 Worker 请求任务时:
    • 如果有待执行的 map / 任务,下发
    • 如果没有待执行的 map 任务,且还处在 map 阶段,要求 worker 等待
    • 如果在 reduce 阶段,下发待执行的 reduce 任务,不存在在要求 worker 等待
  • 当 Worker 上报任务结果时
    • 如果 map 任务全部执行完毕,切换到 reduce 阶段,初始化 reduce 任务
    • 如果 reduce 任务全部执行完毕,退出

此外,需要有一个后台 goroutine 扫描失联的任务,重新下发,这也是为什么没有任务时要求 worker 等待。

需要注意的是,在 Golang 中,由于没有 volatile 这样的字段可以保证可见性,因此要对于共享变量的读写都要加锁来保证可见性。由于 Coordinator 侧没有很重的业务计算逻辑,加锁时间不会太长,可以接受。

Worker

1
2
3
4
5
6
7
8
9
10
11
12
type worker struct {
sync.Mutex
WorkerCode string // worker id
TaskId string // task id
Mode int8 // 任务模式
InputFiles []string // 任务输入文件
ReduceFiles [][]string // reduce输出
Done bool // 任务执行结束
MapF func(string, string) []KeyValue // map函数
ReduceF func(string, []string) string // reduce函数
NReduce int // reduce 桶数量
}

Worker 侧存在两个 goroutine,任务执行和心跳维护。

在 Worker 侧的主逻辑里,需要重复执行:

  • 请求任务
  • 根据任务类型、执行任务并上报结果

在心跳维护组件里,每隔一段时间上报任务的心跳(任务 Id)。

可以发现,TaskId 作为共享变量、也需要加锁才能保证可见性。但是需要额外考虑的是,Worker 侧可能有耗时的计算逻辑,如果对函数整个加锁,心跳线程长时间获取不到锁就会阻塞,就出问题了。

因此,需要尽量缩小锁的范围锁的目的是保护共享变量的读写,分析可知,这种情况下,共享变量只有 TaskId,因此,可以抽一个函数,只对获取 TaskId 这一步加锁,就可以避免锁范围太大导致的问题。

课程资源