MIT6.824 Lab1 MapReduce

2022-02-1200 分钟
type
status
date
slug
summary
tags
category
icon
password
 
MIT6.824 的第一节课要做的 Lab 就是大名鼎鼎的 Google 分布式系统三驾马车之一的 MapReduce(另外两个是 GFS 和 BigTable),课程需要我们阅读 MapReduce 的论文后实现一个简化版的 MapReduce Framework
 

概览

首先应该先看论文,再 clone 项目,看看涉及 Lab1 部分的结构,弄清楚要基于什么去完成实验
从用户的角度来看,应该看我们将要实现的东西能给 Client 提供怎样的抽象。首先 /mrapps 中的就是使用 MapReduce 库的 Clients,例如 Getting started 中的 /mrapps/wc.go 是进行词频统计的,MapReduce 库需要用户实现 Map 和 Reduce 两个函数:
它们的语义和论文中的一致,不同的是,这里在 Map 中没有论文中的 Emit 操作,而是要求用户返回一个 key-value 列表,列表的每一项就相当于一个 Emit 的 key-value 对
最后将 Client 通过 go build -buildmode=plugin xxx.go 编译为动态链接库,启动一个 MapReduce Job 时和输入文件一起作为参数传入
 
实验提供了一个简单的顺序单机 MapReduce 实现在 mrsequential.go 内,可以看下它的实现,有助于理解执行过程。其首先加载了被编译为动态链接库的用户代码:
然后加载输入文件传递给用户代码的 Map 执行,将执行的中间结果存储起来
Map 执行完后对中间结果排序
最后调用 Reduce 并写入结果文件
总体流程基本对应论文的图 1 的各个阶段,只不过是单机顺序实现,没有调度和网络部分
notion image
 
然后我们需要实现自己的 MapReduce 库在 mr/master.gomr/worker.gomr/rpc.go 三个文件中,里面已经有一些定义好的方法。要实现一些细节和要注意的点包括:
  1. 一个 Master,一个或多个 Worker,通过 RPC 通信。在开始一个 MapReduce Job 时,通常先启动 Master,然后启动一个或多个 Worker(通过 main/mrmaster.go 和 main/mrworker.go
  1. 容错机制,包括 Worker、Master 宕机的处理
    1. Master 的宕机可以和论文一样,交给客户端重试,不同之处是论文中持久化了 checkpoint 使得 Master 在宕机恢复后可以恢复任务进度,但 Lab 中没有要求我们实现;对于 Worker,如果执行任务超时,则 Master 将认为它宕机或执行缓慢,此时重新分配任务到另一个 Worker
      另一方面,应该确保宕机时不会出现文件部分写入的情况,一个方法是在文件写入后重命名(大部分操作系统的文件系统实现中,重命名是原子操作)
  1. 同论文一样,Map 的输出(中间结果)也应该写入到文件中,然后划分成多个 bucket,这保证相同 key 都在同一文件中,能在同一 Worker 里被打包到 Reduce 执行,这个分区数量可以通过启动 Master 时的参数指定
  1. 每个 Reduce 的结果输出到单独的文件上
  1. Master 应该实现 Done 以明确地告知客户端 MapReduce 是否执行完毕
  1. 结束后应该正确地回收每个 Worker 进程
 
参考论文第 3 节,捋一下我们的流程:
  1. 首先启动 Master,然后启动一个或多个 Worker,Worker 启动时向 Master 获取任务
  1. Master 为 Worker 分配任务后,要监视其是否完成或超时,超时后将任务分配给其他 Worker。任务一开始只应该分配 Map ,所有 Map 执行完成后才分配 Reduce
  1. 当 Worker 完成一个任务后,继续获取任务
  1. Map 中,Master 需要传递给 Worker 输入文件的位置,Worker 执行 Map 完后写入中间结果到本地并进行分区,然后返回位置给 Master
  1. Master 需要等待 Map 都执行完毕才分配 Reduce,为 Worker 分配 Reduce 时要传递同一分区的中间结果文件的位置,Worker 读取文件并进行排序以将相同的 key 排到一起,然后将相同的 key 对应的 value 集合传入 Reduce 函数中
  1. 所有任务执行完后,Master 告知客户端以结束对 MapReduce 的调用
 

数据结构

逻辑都清楚后就可以开始实现了,Master 作为最重要的节点,其存储了大多数必要的数据,如论文 3.2 节所示:
notion image
大致分为:
  1. Map 和 Reduce 任务的状态
  1. Worker 节点信息
  1. 输入和中间文件的位置
这里我们可以不需要存储 Worker 节点信息,因为这里是 Worker 向 Master 拉任务,而不是 Master 推任务给 Worker,Master 可以不用知道 Worker 的具体情况。只存储任务状态和文件位置即可,Master 的结构大致如下:
TaskType 要在 RPC 中传输,定义在 mr/rpc.go 中,多了一个 None 用以表示当前无任务可分配的情况:
客户端(main/mrmaster.go)会调用 MakeMaster 初始化 Master 并启动 Master 的 RPC 服务器,初始化时把输入文件列表添加到 Map 任务中并标记为空闲:
 
Master 中应该给 Worker 提供三种 RPC 接口:
  • 心跳接口,供 Worker 判断 Master 是否活着
  • 获取任务接口,Worker 从这个接口获取新任务以执行
  • 通知任务完成接口,Worker 完成任务后通过该接口通知 Master
RPC 这里用的是 Go 内置的 net/rpc,一些 RPC 过程中的数据类型定义如下:
net/rpc 使用的 gob 来序列化,它不支持传递空值,所以定义一个 EmptyArgs 空结构体来表示,空结构体不占用实际内存,在 Master 中的空闲任务队列中也使用了这个方法
Task 是 Master 返回任务给 Worker 时传递的类型,包括任务 ID,Reduce 分区数,任务类型(Map 或 Reduce)和输入文件列表 FinishNotify 是 Worker 完成任务时向 Master 通知传递的类型,包括错误信息,完成的任务内容本身,输出的文件列表
 

Master 实现

然后是 Master 的三个 RPC 接口实现:
心跳接口实际上不需要返回任何数据,RPC 调用底层会知道是否调用成功,这就够了
获取任务接口,因为需要并发修改任务状态,得加锁。然后确定待分配的任务类型,Map 全部执行完后才执行 Reduce,从空闲队列中取出后,返回即可。最后新建了个 goroutine 来检查任务情况,如果 10 秒内未执行完毕,可以认为 Worker 宕机或执行缓慢,重置任务进度等待下次分配
最后一个通知任务完成接口,如果成功完成,就进行记录,同时如果完成的任务是 Map 的话,要将其输出的中间结果作为 Reduce 任务添加进任务队列里。最后检查是否所有任务都已完成
最后完成这里有个 finish ,它的作用是通知 Done 可以返回给客户端任务已完成
 

Worker 实现

Master 的实现基本完成,接下来是 Worker,Worker 信息的定义如下,存储 Map 函数,Reduce 函数,是否可以退出和接收的任务队列:
客户端(main/mrworker.go)会调用 Worker() 来启动 Worker,在其中进行我们的初始化
  • 首先新建一个 goroutine 在后台对 Master 进行心跳检测,设定中当 Master 10s 内未响应时,Worker 可以认为 Master 已经关闭,自身便可以关闭。Worker 退出的另一个设计思路是可以添加一个 TaskType 表示「请关闭」,Worker 接收到该任务后就可以自行关闭
  • 然后是一个循环,不断从 Master 处获取任务并执行,当收到退出信号时就退出
其中 healthCheck 和 pullTask 实现如下,call 是 Lab 本身已经实现好的对 RPC 调用的封装:
当接收到任务时,就调用 execTask 来执行任务,它根据任务类型调用 execMap 或 execReduce 来执行,最后处理执行结果,通知回 Master
execMap 内首先循环读入每个文件,将它们的内容传递给用户的 Map 函数执行,结果追加到 key-value 列表中,然后创建指定分区数量(NReduce)的临时文件,通过 ihash 来确定每个 key 映射到哪个分区中,序列化数据写入到该分区的临时文件上。最后通过原子重命名来保证不会出现文件的部分写入
execReduce 类似,读入文件,反序列化加载到 key-value 列表中,然后排序以让相同 key 排在一起,文件大到不足以放入内存时应该采用外部排序,不过 Lab 中可以假定不存在这种情况。然后对于相同的 key,将其 value 的集合打包调用用户的 Reduce 函数。结果也是一样写入临时文件再原子重命名
 

测试

Worker 也搞定,最后调用 main/test-mr.sh 进行测试,其会运行五个测试
  1. wc-test:词频统计应用测试
  1. indexer-test:索引构建应用测试
  1. map-parallelism-test: Map 并行测试
  1. reduce-parallelism-test: Reduce 并行测试
  1. crash-test: 宕机测试
notion image
bingo

下一篇

蒟蒻的 2021 年度总结

Loading...