mit 6.824分布式系统
课程主页
lab1是第一次作业,本菜鸡用了好几天独立完成,经过一次改版优化了数据结构和解决任务元数据并发环境下的data race问题,建议大家做之前有自己独立的思考,有很多可行方案都能完成任务。比如看到有的小伙伴采用master(coordinator)轮询slave(worker)进行交互,我是用slave定时发送请求触发master懒执行大部分任务(后面会聊到原因)。也有的小伙伴用队列增删加锁实现并发安全,本人用的golang自带的channel作为任务队列。不得不感叹人家本科生就有机会学这么有意思的课程,听说lab2更酸爽,后面会接着去冲塔。总之,集中一段时间做这个lab1挺有趣的。
我的代码在这里 :送餐员小李Gitee
1.实验准备MapReduce论文阅读
Golang入门:需要掌握 基本语法/IDE配置/数组/面向对象/管道协程,小案例都实现一遍即可,十几个小时够了
Golang核心-韩顺平
阅读 lab1 note lab1
看下mrsequential.go的流程,全程一个worker干到底无coordinator,看下map,reduce函数的加载和数据的流向
windows不支持动态插件编译,硬塞一个linux编译好的so文件也不行,当然可以通过改代码的形式把map reduce 的wordcount函数硬编码进main/worker.go,但不建议这样做后面会很麻烦。
buildmode=plugin not supported on windows/amd64
VMware搞个Ubuntu,然后golang,goland配置好
Ubuntu 16 镜像下载
VMware
VMware如何安装Ubuntu
GOPATH选择git clone下来的文件夹6.824
GOROOT是golang的安装位置
wc.so运行前每次都要重新build,因此在写个build-wc.sh脚本放在main路径下面 configuration取名"build wc"
也可以在脚本加一个去掉"mr-"开头文件的句子,把后面频繁调试mr程序生成的结果文件去掉,省的每次运行前都去手删
go build -buildmode=plugin ../mrapps/wc.go
# rm -f mr-*
mrsequential.go 最下面的Before launch点击加号Run Another Configuration添加"build wc",这样每次运行之前都build一下wc.so
至此mrsequential.go 应该可以在goland运行起来了同理mrworker.go也需要这样配置
mrcoordinator.go 需要将所有mian下面的txt文本加入进去
2.任务梳理pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt
lab1 note 里面的内容,内容梳理,一开始比较模糊也没关系,做到哪里回看就行
2.1 描述实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。
2.2 规则map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()
worker需要将第X个reduce task结果放到mr-out-X中。
mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了
main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。
任务都完成后,worker也得关闭
2.3 提示一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。
map reduce函数都是通过go插件装载 (.so文件)
mr/ 文件变了就需要重新build
都在一个文件系统,worker天然实现文件共享,先凑合着起步
中间文件命名 mr-X-Y X是map任务号,y是reduce任务号
worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。
worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号
mrsequential.go 代码可以借鉴
coordinator里面的共享数据需要加锁
worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下
如果任务重试机制,记得不要生成重复任务
mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。
确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名
3.开始做任务每一步提示思路,是我的心路历程,应该符合大多数人的逻辑
3.1 小目标:coordinator分配好任务worker取到后打印出来首先目标锁定在coordinator和worker的互动,其他的都不需要考虑,worker打印出来自己要做任务的文件名
mrsequential.go 里面逻辑看懂
看懂coordinator和worker的rpc交互流程
call 函数通过1234端口传入args和reply的内存地址,调用rpcname(Coordinator.函数名),通过反射机制"远程"调用Coordinator的该函数,Coordinator通过内存地址读取入参写出结果。worker.go里面的Worker方法调用CallExample,先运行Coordinator,再运行worker,看看worker端打印返回来的经过Coordinator加工过的数字
至此worker和coordinator可以互动了
发放给worker任务的结构体
Job类型(枚举常量):JobType,以便worker知道这是Map任务还是Reduce任务。
InputFile 文件名数组 :map情况数组里面就一个分配给worker的文件(Hadoop里面大文件是需要切块的,但是这里面的文件都很小就不切了直接一个文件给一个worker),reduce情况下是worker需要选取的一些需要聚合到一起的中间文件
JobId/ReducerNum worker需要知道这些以便生成中间结果文件"mr-tmp-x-y"
x是jobid,y是经过hash后的reduce id, y用来标识哪些文件汇入同一个reduce
type Job struct {
JobType JobType
InputFile []string
JobId int
ReducerNum int
//TmpFileList []string
}
任务存放
放channel里面就行,省得自己写队列的各种方法,还能天然并发安全
channel 是golang特有的类型化消息的队列,可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。
因此Coordinator定义了两个channel来存放做好的map和reduce,jobMetaHolder(元数据管理相关)和CoordinatorCondition(coordinator状态),暂时不用管可以注释掉
type Coordinator struct {
// channel which hold uncompleted task
JobChannelMap chan *Job
JobChannelReduce chan *Job
ReducerNum int
MapNum int
CoordinatorCondition Condition
uniqueJobId int
jobMetaHolder JobMetaHolder
}
map任务制作Coordinator制作map任务,在一开始程序运行的时候就执行
func (c *Coordinator) makeMapJobs(files []string) {
for _, v := range files {
id := c.generateJobId()
//fmt.Println("making map job :", id)
job := Job{
JobType: MapJob,
InputFile: []string{v},
JobId: id,
ReducerNum: c.ReducerNum,
}
//这下面暂时不需要
jobMetaINfo := JobMetaInfo{
condition: JobWaiting,
JobPtr: &job,
}
c.jobMetaHolder.putJob(&jobMetaINfo)
fmt.Println("making map job :", &job)
c.JobChannelMap <- &job
}
//这上面暂时不需要
fmt.Println("done making map jobs")
c.jobMetaHolder.checkJobDone()
}
照着把那个样例rpc交互函数写个distribute方法,把的coordinate端和worker端入参数据类型分别改下,加入这句话即可将JobChannelMap里面的一个job给reply
*reply = *<-c.JobChannelMap
worker端对reply取InputFile第一个元素,打印结果如下,worker线程取到任务了
worker get job which is pg-being_ernest.txt
3.1小目标完成
3.2 worker通过传过来的文件名做map任务,写出结果
这个步骤简单粗暴,照着mrsequential.go里面写一下,记得用ihash处理下key分成Nreduce份用json编码后写出到"mr-tmp-x-y"文件。注意mr论文这步是有排序的,因为真正生产活动数据量是非常巨大的,map端提前排序好后,reduce的排序压力会减小很多。这里排不排序无所谓。
3.2小目标完成 3.3 coordinator感知各个job任务运行完毕和map转reduce时机任务元数据管理
从coordinator视角看任务分发
制作任务 -> 放入队列 -> worker来取
如果worker维护任务的状态显然不合理,每个任务的运行开始时间,任务状态。这些内容worker没必要知晓,是coordinator用来判断任务超时,或者map转reduce的。因此用JobMetaInfo把Job + Job状态包装下
制作任务 -> 放入JobMetaInfo(元数据) -> 放入队列 -> worker来取
type JobMetaInfo struct {
condition JobCondition
StartTime time.Time
JobPtr *Job
}
condition job状态: 包含等待,运行,完成
StartTime : 开始运行的时间(等待变为运行)
JobPtr : job内存地址(golang是值传递,用地址更高效)
所有任务用map存储在JobMetaHolder,key是job的唯一id
```go
type JobMetaHolder struct {
MetaMap map[int]*JobMetaInfo
}
针对这个JobMetaHolder的 *** 作都可以安排上了,比如放入任务,任务状态更新,检查同一阶段任务是否完成
放入任务函数 putJob
func (j *JobMetaHolder) putJob(JobInfo *JobMetaInfo) bool {
jobId := JobInfo.JobPtr.JobId
meta, _ := j.MetaMap[jobId]
if meta != nil {
fmt.Println("meta contains job which id = ", jobId)
return false
} else {
j.MetaMap[jobId] = JobInfo
}
return true
}
任务发射方法,当channel给出任务后,元数据管理器对任务元数据进行状态变更和运行开始时间记录(后面超时任务有用)。和Kafka的InflightMessage有点像。
func (j *JobMetaHolder) fireTheJob(jobId int) bool {
ok, jobInfo := j.getJobMetaInfo(jobId)
if !ok || jobInfo.condition != JobWaiting {
return false
}
jobInfo.condition = JobWorking
jobInfo.StartTime = time.Now()
return true
}
检查当前阶段任务是否完成。因为每次制作jobs后,实在加锁情况下一股脑更新到元数据的,因此这边通过遍历先检查reduce完成未完成数量再检查map就能判断两种情况下的
完成情况。每次也会print 任务的数量信息方便调试
func (j *JobMetaHolder) checkJobDone() bool {
reduceDoneNum := 0
reduceUndoneNum := 0
mapDoneNum := 0
mapUndoneNum := 0
for _, v := range j.MetaMap {
if v.JobPtr.JobType == MapJob {
if v.condition == JobDone {
mapDoneNum += 1
} else {
mapUndoneNum++
}
} else {
if v.condition == JobDone {
reduceDoneNum++
} else {
reduceUndoneNum++
}
}
}
fmt.Printf("%d/%d map jobs are done, %d/%d reduce job are done\n",
mapDoneNum, mapDoneNum+mapUndoneNum, reduceDoneNum, reduceDoneNum+reduceUndoneNum)
return (reduceDoneNum > 0 && reduceUndoneNum == 0) || (mapDoneNum > 0 && mapUndoneNum == 0)
}
coordinator的状态转换每次当channel的长度为0的时候,去checkJobDone检查一下,当这个阶段所有任务都完成以后,进行状态转换并作出相应 *** 作nextPhase()
func (c *Coordinator) nextPhase() {
if c.CoordinatorCondition == MapPhase {
c.makeReduceJobs()
c.CoordinatorCondition = ReducePhase
} else if c.CoordinatorCondition == ReducePhase {
c.CoordinatorCondition = AllDone
}
}
3.3小目标完成
3.4 coordinator 的任务分配函数
任务分配方法任务分配方法是coordinator最核心的函数,worker每次来询问都会调用这个方法
func (c *Coordinator) DistributeJob(args *ExampleArgs, reply *Job) error {
mu.Lock()
defer mu.Unlock()
fmt.Println("coordinator get a request from worker :")
if c.CoordinatorCondition == MapPhase {
if len(c.JobChannelMap) > 0 {
*reply = *<-c.JobChannelMap
if !c.jobMetaHolder.fireTheJob(reply.JobId) {
fmt.Printf("[duplicated job id]job %d is running\n", reply.JobId)
}
} else {
reply.JobType = WaittingJob
if c.jobMetaHolder.checkJobDone() {
c.nextPhase()
}
return nil
}
} else if c.CoordinatorCondition == ReducePhase {
if len(c.JobChannelReduce) > 0 {
*reply = *<-c.JobChannelReduce
if !c.jobMetaHolder.fireTheJob(reply.JobId) {
fmt.Printf("job %d is running\n", reply.JobId)
}
} else {
reply.JobType = WaittingJob
if c.jobMetaHolder.checkJobDone() {
c.nextPhase()
}
return nil
}
} else {
reply.JobType = KillJob
}
return nil
}
worker完成任务调用的 *** 作
这里使用了判断元数据的办法看看任务是否会重复完成,在后面的崩溃测试下,worker会失效,coordinator需要重新发放Jobg给另一个worker,加入刚才那个失效的worker恢复了然后写入回来,不能覆盖已经完成的数据(甚至都下一个阶段了)。
func (c *Coordinator) JobIsDone(args *Job, reply *ExampleReply) error {
mu.Lock()
defer mu.Unlock()
switch args.JobType {
case MapJob:
ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
//prevent a duplicated work which returned from another worker
if ok && meta.condition == JobWorking {
meta.condition = JobDone
fmt.Printf("Map task on %d complete\n", args.JobId)
} else {
fmt.Println("[duplicated] job done", args.JobId)
}
break
case ReduceJob:
fmt.Printf("Reduce task on %d complete\n", args.JobId)
ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
//prevent a duplicated work which returned from another worker
if ok && meta.condition == JobWorking {
meta.condition = JobDone
} else {
fmt.Println("[duplicated] job done", args.JobId)
}
break
default:
panic("wrong job done")
}
return nil
``
3.5 加锁由于我用了channel所以在任务分配队列实现了天然并发安全,但是在别的地方还是遇到了问题,比如Done函数通过mrcoordinator主线程去时不时读取coordinator的状态来判断是否结束死循环。还有在一个worker调coordinator拉取数据的时候,另一个worker调coordinator的checkJobDone()函数进行检查。因此在响应可能发生冲突的地方加锁。
3.6 crash test这里我是又起了一个线程去检查JobMetaHolder里面超时的任务,具体可以参考代码
但是无法通过test-mr.sh
去看了crash.go和test-mr.sh里面的逻辑,发现起的worker线程太少了,crash.go里面的maybecrash方法很有可能瘫痪这个worker,只起一个甚至三个都会导致最后没有可用worker,即使将任务重新放到channel里面
因此我修改了test-mr.sh,起了更多的worker,通过了测试。
欢迎分享,转载请注明来源:内存溢出
评论列表(0条)