type
status
date
slug
summary
tags
category
icon
password
Lab 地址:6.824 Lab 1: MapReduce
一个中文翻译:MapReduce:在大型集群上简化数据处理
MIT6.824 的第一节课要做的 Lab 就是大名鼎鼎的 Google 分布式系统三驾马车之一的 MapReduce(另外两个是 GFS 和 BigTable),课程需要我们阅读 MapReduce 的论文后实现一个简化版的 MapReduce Framework
概览
首先应该先看论文,再
clone
项目,看看涉及 Lab1 部分的结构,弄清楚要基于什么去完成实验从用户的角度来看,应该看我们将要实现的东西能给 Client 提供怎样的抽象。首先
/mrapps
中的就是使用 MapReduce 库的 Clients,例如 Getting started 中的 /mrapps/wc.go
是进行词频统计的,MapReduce 库需要用户实现 Map 和 Reduce 两个函数:它们的语义和论文中的一致,不同的是,这里在
Map
中没有论文中的 Emit
操作,而是要求用户返回一个 key-value 列表,列表的每一项就相当于一个 Emit
的 key-value 对最后将 Client 通过
go build -buildmode=plugin xxx.go
编译为动态链接库,启动一个 MapReduce Job 时和输入文件一起作为参数传入实验提供了一个简单的顺序单机 MapReduce 实现在
mrsequential.go
内,可以看下它的实现,有助于理解执行过程。其首先加载了被编译为动态链接库的用户代码:然后加载输入文件传递给用户代码的
Map
执行,将执行的中间结果存储起来Map
执行完后对中间结果排序最后调用
Reduce
并写入结果文件总体流程基本对应论文的图 1 的各个阶段,只不过是单机顺序实现,没有调度和网络部分
然后我们需要实现自己的 MapReduce 库在
mr/master.go
,mr/worker.go
,mr/rpc.go
三个文件中,里面已经有一些定义好的方法。要实现一些细节和要注意的点包括:- 一个 Master,一个或多个 Worker,通过 RPC 通信。在开始一个 MapReduce Job 时,通常先启动 Master,然后启动一个或多个 Worker(通过
main/mrmaster.go
和main/mrworker.go
)
- 容错机制,包括 Worker、Master 宕机的处理
Master 的宕机可以和论文一样,交给客户端重试,不同之处是论文中持久化了 checkpoint 使得 Master 在宕机恢复后可以恢复任务进度,但 Lab 中没有要求我们实现;对于 Worker,如果执行任务超时,则 Master 将认为它宕机或执行缓慢,此时重新分配任务到另一个 Worker
另一方面,应该确保宕机时不会出现文件部分写入的情况,一个方法是在文件写入后重命名(大部分操作系统的文件系统实现中,重命名是原子操作)
- 同论文一样,
Map
的输出(中间结果)也应该写入到文件中,然后划分成多个 bucket,这保证相同 key 都在同一文件中,能在同一 Worker 里被打包到Reduce
执行,这个分区数量可以通过启动 Master 时的参数指定
- 每个
Reduce
的结果输出到单独的文件上
- Master 应该实现
Done
以明确地告知客户端 MapReduce 是否执行完毕
- 结束后应该正确地回收每个 Worker 进程
参考论文第 3 节,捋一下我们的流程:
- 首先启动 Master,然后启动一个或多个 Worker,Worker 启动时向 Master 获取任务
- Master 为 Worker 分配任务后,要监视其是否完成或超时,超时后将任务分配给其他 Worker。任务一开始只应该分配
Map
,所有Map
执行完成后才分配Reduce
- 当 Worker 完成一个任务后,继续获取任务
Map
中,Master 需要传递给 Worker 输入文件的位置,Worker 执行Map
完后写入中间结果到本地并进行分区,然后返回位置给 Master
- Master 需要等待
Map
都执行完毕才分配Reduce
,为 Worker 分配Reduce
时要传递同一分区的中间结果文件的位置,Worker 读取文件并进行排序以将相同的 key 排到一起,然后将相同的 key 对应的 value 集合传入Reduce
函数中
- 所有任务执行完后,Master 告知客户端以结束对 MapReduce 的调用
数据结构
逻辑都清楚后就可以开始实现了,Master 作为最重要的节点,其存储了大多数必要的数据,如论文 3.2 节所示:
大致分为:
Map
和Reduce
任务的状态
- Worker 节点信息
- 输入和中间文件的位置
这里我们可以不需要存储 Worker 节点信息,因为这里是 Worker 向 Master 拉任务,而不是 Master 推任务给 Worker,Master 可以不用知道 Worker 的具体情况。只存储任务状态和文件位置即可,Master 的结构大致如下:
TaskType
要在 RPC 中传输,定义在 mr/rpc.go
中,多了一个 None
用以表示当前无任务可分配的情况:客户端(
main/mrmaster.go
)会调用 MakeMaster
初始化 Master 并启动 Master 的 RPC 服务器,初始化时把输入文件列表添加到 Map
任务中并标记为空闲:Master 中应该给 Worker 提供三种 RPC 接口:
- 心跳接口,供 Worker 判断 Master 是否活着
- 获取任务接口,Worker 从这个接口获取新任务以执行
- 通知任务完成接口,Worker 完成任务后通过该接口通知 Master
RPC 这里用的是 Go 内置的
net/rpc
,一些 RPC 过程中的数据类型定义如下:net/rpc
使用的 gob 来序列化,它不支持传递空值,所以定义一个 EmptyArgs
空结构体来表示,空结构体不占用实际内存,在 Master 中的空闲任务队列中也使用了这个方法 Task
是 Master 返回任务给 Worker 时传递的类型,包括任务 ID,Reduce 分区数,任务类型(Map 或 Reduce)和输入文件列表 FinishNotify
是 Worker 完成任务时向 Master 通知传递的类型,包括错误信息,完成的任务内容本身,输出的文件列表Master 实现
然后是 Master 的三个 RPC 接口实现:
心跳接口实际上不需要返回任何数据,RPC 调用底层会知道是否调用成功,这就够了
获取任务接口,因为需要并发修改任务状态,得加锁。然后确定待分配的任务类型,
Map
全部执行完后才执行 Reduce
,从空闲队列中取出后,返回即可。最后新建了个 goroutine 来检查任务情况,如果 10 秒内未执行完毕,可以认为 Worker 宕机或执行缓慢,重置任务进度等待下次分配最后一个通知任务完成接口,如果成功完成,就进行记录,同时如果完成的任务是
Map
的话,要将其输出的中间结果作为 Reduce
任务添加进任务队列里。最后检查是否所有任务都已完成最后完成这里有个
finish
,它的作用是通知 Done
可以返回给客户端任务已完成Worker 实现
Master 的实现基本完成,接下来是 Worker,Worker 信息的定义如下,存储
Map
函数,Reduce
函数,是否可以退出和接收的任务队列:客户端(
main/mrworker.go
)会调用 Worker()
来启动 Worker,在其中进行我们的初始化- 首先新建一个 goroutine 在后台对 Master 进行心跳检测,设定中当 Master 10s 内未响应时,Worker 可以认为 Master 已经关闭,自身便可以关闭。Worker 退出的另一个设计思路是可以添加一个
TaskType
表示「请关闭」,Worker 接收到该任务后就可以自行关闭
- 然后是一个循环,不断从 Master 处获取任务并执行,当收到退出信号时就退出
其中
healthCheck
和 pullTask
实现如下,call
是 Lab 本身已经实现好的对 RPC 调用的封装:当接收到任务时,就调用
execTask
来执行任务,它根据任务类型调用 execMap
或 execReduce
来执行,最后处理执行结果,通知回 MasterexecMap
内首先循环读入每个文件,将它们的内容传递给用户的 Map
函数执行,结果追加到 key-value 列表中,然后创建指定分区数量(NReduce
)的临时文件,通过 ihash
来确定每个 key 映射到哪个分区中,序列化数据写入到该分区的临时文件上。最后通过原子重命名来保证不会出现文件的部分写入execReduce
类似,读入文件,反序列化加载到 key-value 列表中,然后排序以让相同 key 排在一起,文件大到不足以放入内存时应该采用外部排序,不过 Lab 中可以假定不存在这种情况。然后对于相同的 key,将其 value 的集合打包调用用户的 Reduce
函数。结果也是一样写入临时文件再原子重命名测试
Worker 也搞定,最后调用
main/test-mr.sh
进行测试,其会运行五个测试- wc-test:词频统计应用测试
- indexer-test:索引构建应用测试
- map-parallelism-test:
Map
并行测试
- reduce-parallelism-test:
Reduce
并行测试
- crash-test: 宕机测试
bingo