Mit6.824 lab1全解析(推导历程+代码)

Mit6.824 lab1全解析(推导历程+代码),第1张

0.前言

mit 6.824分布式系统
课程主页

lab1是第一次作业,本菜鸡用了好几天独立完成,经过一次改版优化了数据结构和解决任务元数据并发环境下的data race问题,建议大家做之前有自己独立的思考,有很多可行方案都能完成任务。比如看到有的小伙伴采用master(coordinator)轮询slave(worker)进行交互,我是用slave定时发送请求触发master懒执行大部分任务(后面会聊到原因)。也有的小伙伴用队列增删加锁实现并发安全,本人用的golang自带的channel作为任务队列。不得不感叹人家本科生就有机会学这么有意思的课程,听说lab2更酸爽,后面会接着去冲塔。总之,集中一段时间做这个lab1挺有趣的。

我的代码在这里 :送餐员小李Gitee

1.实验准备

MapReduce论文阅读

Golang入门:需要掌握 基本语法/IDE配置/数组/面向对象/管道协程,小案例都实现一遍即可,十几个小时够了

Golang核心-韩顺平

阅读 lab1 note lab1
看下mrsequential.go的流程,全程一个worker干到底无coordinator,看下map,reduce函数的加载和数据的流向

2 goland配置调试环境 windows

windows不支持动态插件编译,硬塞一个linux编译好的so文件也不行,当然可以通过改代码的形式把map reduce 的wordcount函数硬编码进main/worker.go,但不建议这样做后面会很麻烦。

buildmode=plugin not supported on windows/amd64

VMware搞个Ubuntu,然后golang,goland配置好

Ubuntu 16 镜像下载
VMware
VMware如何安装Ubuntu

linux

GOPATH选择git clone下来的文件夹6.824

GOROOT是golang的安装位置

wc.so运行前每次都要重新build,因此在写个build-wc.sh脚本放在main路径下面 configuration取名"build wc"

也可以在脚本加一个去掉"mr-"开头文件的句子,把后面频繁调试mr程序生成的结果文件去掉,省的每次运行前都去手删

go build -buildmode=plugin ../mrapps/wc.go
# rm -f mr-*

mrsequential.go 最下面的Before launch点击加号Run Another Configuration添加"build wc",这样每次运行之前都build一下wc.so

至此mrsequential.go 应该可以在goland运行起来了

同理mrworker.go也需要这样配置

mrcoordinator.go 需要将所有mian下面的txt文本加入进去

pg-being_ernest.txt pg-dorian_gray.txt pg-frankenstein.txt pg-grimm.txt pg-huckleberry_finn.txt pg-metamorphosis.txt pg-sherlock_holmes.txt pg-tom_sawyer.txt

2.任务梳理

lab1 note 里面的内容,内容梳理,一开始比较模糊也没关系,做到哪里回看就行

2.1 描述

实现分布式mr,一个coordinator,一个worker(启动多个),在这次实验都在一个机器上运行。worker通过rpc和coordinator交互。worker请求任务,进行运算,写出结果到文件。coordinator需要关心worker的任务是否完成,在超时情况下将任务重新分配给别的worker。

2.2 规则

map阶段需要将中间keys分成nReduce个数, nReduce通过main/mrcoordinator.go传给MakeCoordinator()

worker需要将第X个reduce task结果放到mr-out-X中。

mr-out-X要一行一行生成,kv形式。main/mrsequential.go中有,拿来就完事了

main/mrcoordinator.go从mr/coordinator.go 的 Done()方法得知任务完成并关闭自己。

任务都完成后,worker也得关闭

2.3 提示

一开始可以从mr/worker.go的 Worker()方法做,发送rpc给coordinator请求任务,然后coordinator分配任务,然后worker读文件并且map函数处理。

map reduce函数都是通过go插件装载 (.so文件)

mr/ 文件变了就需要重新build

都在一个文件系统,worker天然实现文件共享,先凑合着起步

中间文件命名 mr-X-Y X是map任务号,y是reduce任务号

worker的map方法用json存储中间kv对,reduce再读回来,因为真正分布式worker都不在一个机器上,涉及网络传输,所以用json编码解码走个过场。

worker的map可以用 worker.go里面的ihash(key)得到特定key的reduce任务号

mrsequential.go 代码可以借鉴

coordinator里面的共享数据需要加锁

worker有时候需要等待,比如当map任务都分发出去了,有的worker完成后又来申请任务,此时还有map未完成,reduce不能开始,这个worker需要等待下

如果任务重试机制,记得不要生成重复任务

mrapps/crash.go 随机干掉map reduce,看crash.go的代码是有一定几率让worker直接退出或者长时间延迟,可以用来测试恢复功能。这个逻辑是整合在map reduce函数里面的,注意worker被干掉时候任务已经拿到手了。

确保没有人在出现崩溃时观察部分写入的文件,用ioutil.TempFile创建临时文件,用os.Rename重命名

3.开始做任务

每一步提示思路,是我的心路历程,应该符合大多数人的逻辑

3.1 小目标:coordinator分配好任务worker取到后打印出来

首先目标锁定在coordinator和worker的互动,其他的都不需要考虑,worker打印出来自己要做任务的文件名

mrsequential.go 里面逻辑看懂

看懂coordinator和worker的rpc交互流程

call 函数通过1234端口传入args和reply的内存地址,调用rpcname(Coordinator.函数名),通过反射机制"远程"调用Coordinator的该函数,Coordinator通过内存地址读取入参写出结果。worker.go里面的Worker方法调用CallExample,先运行Coordinator,再运行worker,看看worker端打印返回来的经过Coordinator加工过的数字

至此worker和coordinator可以互动了

发放给worker任务的结构体

Job类型(枚举常量):JobType,以便worker知道这是Map任务还是Reduce任务。

InputFile 文件名数组 :map情况数组里面就一个分配给worker的文件(Hadoop里面大文件是需要切块的,但是这里面的文件都很小就不切了直接一个文件给一个worker),reduce情况下是worker需要选取的一些需要聚合到一起的中间文件

JobId/ReducerNum worker需要知道这些以便生成中间结果文件"mr-tmp-x-y"
x是jobid,y是经过hash后的reduce id, y用来标识哪些文件汇入同一个reduce

type Job struct {
   JobType    JobType
   InputFile  []string
   JobId      int
   ReducerNum int
   //TmpFileList []string
}
任务存放

放channel里面就行,省得自己写队列的各种方法,还能天然并发安全

channel 是golang特有的类型化消息的队列,可以通过它们发送类型化的数据在协程之间通信,可以避开所有内存共享导致的坑;通道的通信方式保证了同步性。数据通过通道:同一时间只有一个协程可以访问数据:所以不会出现数据竞争,设计如此。数据的归属(可以读写数据的能力)被传递。

因此Coordinator定义了两个channel来存放做好的map和reduce,jobMetaHolder(元数据管理相关)和CoordinatorCondition(coordinator状态),暂时不用管可以注释掉

type Coordinator struct {
   // channel which hold uncompleted task
   JobChannelMap        chan *Job
   JobChannelReduce     chan *Job
   ReducerNum           int
   MapNum               int
   CoordinatorCondition Condition
   uniqueJobId          int
   jobMetaHolder        JobMetaHolder
}

map任务制作
Coordinator制作map任务,在一开始程序运行的时候就执行
func (c *Coordinator) makeMapJobs(files []string) {
  for _, v := range files {
  	id := c.generateJobId()
  	//fmt.Println("making map job :", id)
  	job := Job{
  		JobType:    MapJob,
  		InputFile:  []string{v},
  		JobId:      id,
  		ReducerNum: c.ReducerNum,
  	}

  //这下面暂时不需要
  	jobMetaINfo := JobMetaInfo{
  		condition: JobWaiting,
  		JobPtr:    &job,
  	}
  	c.jobMetaHolder.putJob(&jobMetaINfo)
  	fmt.Println("making map job :", &job)
  	c.JobChannelMap <- &job
  }
  //这上面暂时不需要
  fmt.Println("done making map jobs")
  c.jobMetaHolder.checkJobDone()
}

照着把那个样例rpc交互函数写个distribute方法,把的coordinate端和worker端入参数据类型分别改下,加入这句话即可将JobChannelMap里面的一个job给reply

 *reply = *<-c.JobChannelMap

worker端对reply取InputFile第一个元素,打印结果如下,worker线程取到任务了

worker get job which is pg-being_ernest.txt
3.1小目标完成 3.2 worker通过传过来的文件名做map任务,写出结果

这个步骤简单粗暴,照着mrsequential.go里面写一下,记得用ihash处理下key分成Nreduce份用json编码后写出到"mr-tmp-x-y"文件。注意mr论文这步是有排序的,因为真正生产活动数据量是非常巨大的,map端提前排序好后,reduce的排序压力会减小很多。这里排不排序无所谓。

3.2小目标完成 3.3 coordinator感知各个job任务运行完毕和map转reduce时机

任务元数据管理

从coordinator视角看任务分发

制作任务 -> 放入队列 -> worker来取

如果worker维护任务的状态显然不合理,每个任务的运行开始时间,任务状态。这些内容worker没必要知晓,是coordinator用来判断任务超时,或者map转reduce的。因此用JobMetaInfo把Job + Job状态包装下

制作任务 -> 放入JobMetaInfo(元数据) -> 放入队列 -> worker来取

type JobMetaInfo struct {
condition JobCondition
StartTime time.Time
JobPtr *Job
}


 condition job状态: 包含等待,运行,完成
 StartTime : 开始运行的时间(等待变为运行)
 JobPtr  : job内存地址(golang是值传递,用地址更高效)

所有任务用map存储在JobMetaHolder,key是job的唯一id
```go
type JobMetaHolder struct {
  MetaMap map[int]*JobMetaInfo
}

针对这个JobMetaHolder的 *** 作都可以安排上了,比如放入任务,任务状态更新,检查同一阶段任务是否完成

放入任务函数 putJob

 func (j *JobMetaHolder) putJob(JobInfo *JobMetaInfo) bool {
  jobId := JobInfo.JobPtr.JobId
  meta, _ := j.MetaMap[jobId]
  if meta != nil {
  	fmt.Println("meta contains job which id = ", jobId)
  	return false
  } else {
  	j.MetaMap[jobId] = JobInfo
  }
  return true
}

任务发射方法,当channel给出任务后,元数据管理器对任务元数据进行状态变更和运行开始时间记录(后面超时任务有用)。和Kafka的InflightMessage有点像。

func (j *JobMetaHolder) fireTheJob(jobId int) bool {
	ok, jobInfo := j.getJobMetaInfo(jobId)
	if !ok || jobInfo.condition != JobWaiting {
		return false
	}
	jobInfo.condition = JobWorking
	jobInfo.StartTime = time.Now()
	return true
}

检查当前阶段任务是否完成。因为每次制作jobs后,实在加锁情况下一股脑更新到元数据的,因此这边通过遍历先检查reduce完成未完成数量再检查map就能判断两种情况下的
完成情况。每次也会print 任务的数量信息方便调试

func (j *JobMetaHolder) checkJobDone() bool {
  reduceDoneNum := 0
  reduceUndoneNum := 0
  mapDoneNum := 0
  mapUndoneNum := 0
  for _, v := range j.MetaMap {
  	if v.JobPtr.JobType == MapJob {
  		if v.condition == JobDone {
  			mapDoneNum += 1
  		} else {
  			mapUndoneNum++
  		}
  	} else {
  		if v.condition == JobDone {
  			reduceDoneNum++
  		} else {
  			reduceUndoneNum++
  		}
  	}
  }
  fmt.Printf("%d/%d map jobs are done, %d/%d reduce job are done\n",
  	mapDoneNum, mapDoneNum+mapUndoneNum, reduceDoneNum, reduceDoneNum+reduceUndoneNum)

  return (reduceDoneNum > 0 && reduceUndoneNum == 0) || (mapDoneNum > 0 && mapUndoneNum == 0)
}
coordinator的状态转换
每次当channel的长度为0的时候,去checkJobDone检查一下,当这个阶段所有任务都完成以后,进行状态转换并作出相应 *** 作nextPhase()
func (c *Coordinator) nextPhase() {
	if c.CoordinatorCondition == MapPhase {
		c.makeReduceJobs()
		c.CoordinatorCondition = ReducePhase
	} else if c.CoordinatorCondition == ReducePhase {
		c.CoordinatorCondition = AllDone
	}
}
3.3小目标完成 3.4 coordinator 的任务分配函数 任务分配方法
任务分配方法是coordinator最核心的函数,worker每次来询问都会调用这个方法
func (c *Coordinator) DistributeJob(args *ExampleArgs, reply *Job) error {
	mu.Lock()
	defer mu.Unlock()
	fmt.Println("coordinator get a request from worker :")
	if c.CoordinatorCondition == MapPhase {
		if len(c.JobChannelMap) > 0 {
			*reply = *<-c.JobChannelMap
			if !c.jobMetaHolder.fireTheJob(reply.JobId) {
				fmt.Printf("[duplicated job id]job %d is running\n", reply.JobId)
			}
		} else {
			reply.JobType = WaittingJob
			if c.jobMetaHolder.checkJobDone() {
				c.nextPhase()
			}
			return nil
		}
	} else if c.CoordinatorCondition == ReducePhase {
		if len(c.JobChannelReduce) > 0 {
			*reply = *<-c.JobChannelReduce
			if !c.jobMetaHolder.fireTheJob(reply.JobId) {
				fmt.Printf("job %d is running\n", reply.JobId)
			}
		} else {
			reply.JobType = WaittingJob
			if c.jobMetaHolder.checkJobDone() {
				c.nextPhase()
			}
			return nil
		}
	} else {
		reply.JobType = KillJob
	}
	return nil
}

worker完成任务调用的 *** 作
这里使用了判断元数据的办法看看任务是否会重复完成,在后面的崩溃测试下,worker会失效,coordinator需要重新发放Jobg给另一个worker,加入刚才那个失效的worker恢复了然后写入回来,不能覆盖已经完成的数据(甚至都下一个阶段了)。

func (c *Coordinator) JobIsDone(args *Job, reply *ExampleReply) error {
  mu.Lock()
  defer mu.Unlock()
  switch args.JobType {
  case MapJob:
  	ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
  	//prevent a duplicated work which returned from another worker
  	if ok && meta.condition == JobWorking {
  		meta.condition = JobDone
  		fmt.Printf("Map task on %d complete\n", args.JobId)
  	} else {
  		fmt.Println("[duplicated] job done", args.JobId)
  	}
  	break
  case ReduceJob:
  	fmt.Printf("Reduce task on %d complete\n", args.JobId)
  	ok, meta := c.jobMetaHolder.getJobMetaInfo(args.JobId)
  	//prevent a duplicated work which returned from another worker
  	if ok && meta.condition == JobWorking {
  		meta.condition = JobDone
  	} else {
  		fmt.Println("[duplicated] job done", args.JobId)
  	}
  	break
  default:
  	panic("wrong job done")
  }
  return nil

``

3.5 加锁

由于我用了channel所以在任务分配队列实现了天然并发安全,但是在别的地方还是遇到了问题,比如Done函数通过mrcoordinator主线程去时不时读取coordinator的状态来判断是否结束死循环。还有在一个worker调coordinator拉取数据的时候,另一个worker调coordinator的checkJobDone()函数进行检查。因此在响应可能发生冲突的地方加锁。

3.6 crash test

这里我是又起了一个线程去检查JobMetaHolder里面超时的任务,具体可以参考代码
但是无法通过test-mr.sh

去看了crash.go和test-mr.sh里面的逻辑,发现起的worker线程太少了,crash.go里面的maybecrash方法很有可能瘫痪这个worker,只起一个甚至三个都会导致最后没有可用worker,即使将任务重新放到channel里面

因此我修改了test-mr.sh,起了更多的worker,通过了测试。

欢迎分享,转载请注明来源:内存溢出

原文地址: http://outofmemory.cn/langs/989768.html

(0)
打赏 微信扫一扫 微信扫一扫 支付宝扫一扫 支付宝扫一扫
上一篇 2022-05-21
下一篇 2022-05-21

发表评论

登录后才能评论

评论列表(0条)

保存