MIT-6.824 Lab1: MapReduce-2018
MapReduce是由JeffreyDean提出的一种处理大数据的编程模型,作为在Go中编程和构建容错分布式系统的入门。
集群中有一个master,其它的都是worker,总共有M个map任务和R个reduce任务(M和R由用户指定)。master负责将map和reduce任务分配给空闲的worker并处理worker的故障。
Part I: Map/Reduce input and output
分别实现 common_map.go、common_reduce.go 中的 doMap()、doReduce() 方法。
1.读取一个输入文件inFile。调用用户定义函数mapF,将内容转换为键值对。
2.新建nReduce工作数目相等的中间文件。使用reduceName(jobName, mapTask, r)生成的中间文件名。
3.根据key-value的分配规则(ihash(key) % nReduce),将键值对存入新建的中间文件内。
func doMap(
jobName string, // the name of the MapReduce job
mapTask int, // which map task this is
inFile string,
nReduce int, // the number of reduce task that will be run ("R" in the paper)
mapF func(filename string, contents string) []KeyValue,
) {
data, err := ioutil.ReadFile(inFile)
if err != nil {
panic(err)
}
//新建中间文件
outputFiles := make([] *os.File, nReduce)
for i := 0; i < nReduce; i++ {
fileName := reduceName(jobName, mapTask, i)
outputFiles[i], err = os.Create(fileName)
if err != nil {
panic(err)
}
}
//将输入文件内容转为键值对
keyValues := mapF(inFile, string(data))
//根据hash规则将键值对存入中间文件
for _, kv := range keyValues {
index := ihash(kv.Key) % nReduce
enc := json.NewEncoder(outputFiles[index])
enc.Encode(kv)
}
for _, file := range outputFiles {
file.Close()
}
}
1.读取map工作结果的中间文件的键值对,并合并相同的key。
2.对key排序。
3.将reduceF的结果保存到mergeName()返回的文件中。
func doReduce(
jobName string, // the name of the whole MapReduce job
reduceTask int, // which reduce task this is
outFile string, // write the output here
nMap int, // the number of map tasks that were run ("M" in the paper)
reduceF func(key string, values []string) string,
) {
inputFiles := make([] *os.File, nMap)
for i := 0; i < nMap; i++ {
fileName := reduceName(jobName, i, reduceTask)//注意与中间文件名的创建保持一致
inputFiles[i], _ = os.Open(fileName)
}
//读取中间文件内容
keyValues := make(map[string][]string)
for _, inputFile := range inputFiles {
defer inputFile.Close()
dec := json.NewDecoder(inputFile)
for {
var kv KeyValue
err := dec.Decode(&kv)
if err != nil {
break
}
keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
}
}
//排序
keys := make([]string, 0, len(keyValues))
for k := range keyValues {
keys = append(keys, k)
}
sort.Strings(keys)
//新建结果文件,将key的统计结果存入。
out, err := os.Create(outFile)
if err != nil {
log.Fatal("Error in creating file", outFile)
}
defer out.Close()
enc := json.NewEncoder(out)
for _, key := range keys {
kv := KeyValue{key, reduceF(key, keyValues[key])}
enc.Encode(kv)
}
}
Part II: Single-worker word count
编写main/wc.go 中的 mapF()、reduceF()方法。
mapF() 返回一个键/值对的切片;
reduceF() 返回这个key出现了多少次,即values的长度。
func mapF(filename string, contents string) []mapreduce.KeyValue {
words := strings.FieldsFunc(contents, func(r rune) bool {
return !unicode.IsLetter(r)
})
res := make([]mapreduce.KeyValue, 0)
for _, word := range words {
res = append(res, mapreduce.KeyValue{word, ""})
}
return res
}
func reduceF(key string, values []string) string {
return strconv.Itoa(len(values))
}
Part III: Distributing MapReduce tasks
编写schedule.go中的 schedule()方法。
1.等待所有任务完成。
2.从registerChan中取出worker的地址,将任务分配给它。
注:该通道为每个工作者生成一个字符串,其中包含工作者的RPC地址。有些worker可能在调用schedule()之前存在,有些可能在schedule()运行时启动;所有这些都将出现在registerChan上。schedule()应该使用所有的worker,包括在它启动后出现的那些。
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}
fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
wg := sync.WaitGroup{}
wg.Add(ntasks)
//将任务放入队列
taskCh := make(chan int,ntasks)
for idx:=0;idx<ntasks;idx++ {
taskCh <- idx
}
//所有任务完成,关闭任务channel,退出
go func() {
wg.Wait()
close(taskCh)
}()
for idx := range taskCh{
arg := DoTaskArgs{
JobName: jobName,
File: mapFiles[idx],
Phase: phase,
TaskNumber: idx,
NumOtherPhase: n_other,
}
worker := <- registerChan
go func(worker string,arg DoTaskArgs,idx int) {
if call(worker,"Worker.DoTask",arg,nil){
wg.Done()
}else { //call失败,将任务重新放回队列
taskCh <- idx
}
//任务结束,归还工作线程
registerChan <- worker
}(worker,arg,idx)
}
fmt.Printf("Schedule: %v done\n", phase)
}
Part IV: Handling worker failures
worker 工作失败的例子
RPC失败的原因:1.请求没有达到,工作进程没有执行任务;2.工作进程可能已经执行了它,但是应答丢失;3.工作进程可能仍然在执行,但是主进程的RPC超时了。
代码实现参考part 3;
Part V: Inverted index generation (optional, does not count in grade)
生成倒排索引
func mapF(document string, value string) (res []mapreduce.KeyValue) {
words := strings.FieldsFunc(value, func(r rune) bool {
return !unicode.IsLetter(r)
})
s := make(map[string]bool)
for _, word := range words {
lower := strings.ToLower(word)
upper := strings.ToUpper(word)
_, hasUpper := s[lower]
_, hasLower := s[upper]
if !hasLower && !hasUpper {
if lower == word {
s[lower] = true
} else if upper == word {
s[upper] = true
}
}
}
for k, _ := range s {
res = append(res, mapreduce.KeyValue{k, document})
}
return
}
func reduceF(key string, values []string) string {
sort.Strings(values)
return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}