MIT-6.824 Lab1: MapReduce-2018

2019-10-31  本文已影响0人  yddeng

MapReduce是由JeffreyDean提出的一种处理大数据的编程模型,作为在Go中编程和构建容错分布式系统的入门。
集群中有一个master,其它的都是worker,总共有M个map任务和R个reduce任务(M和R由用户指定)。master负责将map和reduce任务分配给空闲的worker并处理worker的故障。

Part I: Map/Reduce input and output

分别实现 common_map.go、common_reduce.go 中的 doMap()、doReduce() 方法。

1.读取一个输入文件inFile。调用用户定义函数mapF,将内容转换为键值对。
2.新建nReduce工作数目相等的中间文件。使用reduceName(jobName, mapTask, r)生成的中间文件名。
3.根据key-value的分配规则(ihash(key) % nReduce),将键值对存入新建的中间文件内。

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    data, err := ioutil.ReadFile(inFile)
    if err != nil {
        panic(err)
    }
    //新建中间文件
    outputFiles := make([] *os.File, nReduce)
    for i := 0; i < nReduce; i++ {
        fileName := reduceName(jobName, mapTask, i)
        outputFiles[i], err = os.Create(fileName)
        if err != nil {
            panic(err)
        }
    }
    //将输入文件内容转为键值对
    keyValues := mapF(inFile, string(data))
    //根据hash规则将键值对存入中间文件
    for _, kv := range keyValues {
        index := ihash(kv.Key) % nReduce
        enc := json.NewEncoder(outputFiles[index])
        enc.Encode(kv)
    }
    for _, file := range outputFiles {
        file.Close()
    }
}

1.读取map工作结果的中间文件的键值对,并合并相同的key。
2.对key排序。
3.将reduceF的结果保存到mergeName()返回的文件中。

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    inputFiles := make([] *os.File, nMap)
    for i := 0; i < nMap; i++ {
        fileName := reduceName(jobName, i, reduceTask)//注意与中间文件名的创建保持一致
        inputFiles[i], _ = os.Open(fileName)
    }
    //读取中间文件内容
    keyValues := make(map[string][]string)
    for _, inputFile := range inputFiles {
        defer inputFile.Close()
        dec := json.NewDecoder(inputFile)
        for {
            var kv KeyValue
            err := dec.Decode(&kv)
            if err != nil {
                break
            }
            keyValues[kv.Key] = append(keyValues[kv.Key], kv.Value)
        }
    }
    //排序
    keys := make([]string, 0, len(keyValues))
    for k := range keyValues {
        keys = append(keys, k)
    }
    sort.Strings(keys)

    //新建结果文件,将key的统计结果存入。
    out, err := os.Create(outFile)
    if err != nil {
        log.Fatal("Error in creating file", outFile)
    }
    defer out.Close()

    enc := json.NewEncoder(out)
    for _, key := range keys {
        kv := KeyValue{key, reduceF(key, keyValues[key])}
        enc.Encode(kv)
    }
}

Part II: Single-worker word count

编写main/wc.go 中的 mapF()、reduceF()方法。
mapF() 返回一个键/值对的切片;
reduceF() 返回这个key出现了多少次,即values的长度。

func mapF(filename string, contents string) []mapreduce.KeyValue {
    words := strings.FieldsFunc(contents, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    res := make([]mapreduce.KeyValue, 0)
    for _, word := range words {
        res = append(res, mapreduce.KeyValue{word, ""})
    }
    return res
}

func reduceF(key string, values []string) string {
    return strconv.Itoa(len(values))
}

Part III: Distributing MapReduce tasks

编写schedule.go中的 schedule()方法。
1.等待所有任务完成。
2.从registerChan中取出worker的地址,将任务分配给它。
注:该通道为每个工作者生成一个字符串,其中包含工作者的RPC地址。有些worker可能在调用schedule()之前存在,有些可能在schedule()运行时启动;所有这些都将出现在registerChan上。schedule()应该使用所有的worker,包括在它启动后出现的那些。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)
    wg := sync.WaitGroup{}
    wg.Add(ntasks)

    //将任务放入队列
    taskCh := make(chan int,ntasks)
    for idx:=0;idx<ntasks;idx++ {
        taskCh <- idx
    }

    //所有任务完成,关闭任务channel,退出
    go func() {
        wg.Wait()
        close(taskCh)
    }()


    for idx := range taskCh{
        arg := DoTaskArgs{
            JobName:       jobName,
            File:          mapFiles[idx],
            Phase:         phase,
            TaskNumber:    idx,
            NumOtherPhase: n_other,
        }

        worker := <- registerChan
        go func(worker string,arg DoTaskArgs,idx int) {
            if call(worker,"Worker.DoTask",arg,nil){
                wg.Done()
            }else { //call失败,将任务重新放回队列
                taskCh <- idx
            }
            //任务结束,归还工作线程
            registerChan <- worker
        }(worker,arg,idx)

    }
    fmt.Printf("Schedule: %v done\n", phase)
}

Part IV: Handling worker failures

worker 工作失败的例子
RPC失败的原因:1.请求没有达到,工作进程没有执行任务;2.工作进程可能已经执行了它,但是应答丢失;3.工作进程可能仍然在执行,但是主进程的RPC超时了。
代码实现参考part 3;

Part V: Inverted index generation (optional, does not count in grade)

生成倒排索引

func mapF(document string, value string) (res []mapreduce.KeyValue) {
    words := strings.FieldsFunc(value, func(r rune) bool {
        return !unicode.IsLetter(r)
    })
    s := make(map[string]bool)
    for _, word := range words {
        lower := strings.ToLower(word)
        upper := strings.ToUpper(word)
        _, hasUpper := s[lower]
        _, hasLower := s[upper]
        if !hasLower && !hasUpper {
            if lower == word {
                s[lower] = true
            } else if upper == word {
                s[upper] = true
            }
        }
    }

    for k, _ := range s {
        res = append(res, mapreduce.KeyValue{k, document})
    }
    return
}

func reduceF(key string, values []string) string {
    sort.Strings(values)
    return strconv.Itoa(len(values)) + " " + strings.Join(values, ",")
}
上一篇下一篇

猜你喜欢

热点阅读