6.824-Lab1: MapReduce

2019-03-01  本文已影响0人  vickeex

MapReduce, 批处理的典型之一。主要思想即“分而治之”,将一大批数据(一个大任务)分成多个子任务,分别进行运算(同时)(map),再将运算结果合起来(reduce)。
master: 负责任务调度
mapper: 执行各个子任务,map运算
reducer: 执行结果汇总,reduce运算

例:在K/V的wordCount中,源数据为一个大文件,每个mapper负责一部分文件的count,mapper的运算结果(intermediate K/Vs)再交由reducer进行汇总(merge)。
本次实验中对MapReduce进行了基本的使用和简单的任务分发实现。

Part 1: Map/Reduce input and output

task: 完成Map/Reduce的工作流程doMap()和doReduce部分

Part 2: 实现单节点的wordCount

task: 即实现task1中调用的mapF()和reduceF函数。

Part3: distributing MapReduce tasks

task: 实现schedule.go文件,完成对多个worker进行map、reduce 任务的调度。

    var ntasks int
    var n_other int // number of inputs (for reduce) or outputs (for map)
    switch phase {  // 确定当前工作阶段,执行map还是reduce调度
    case mapPhase:
        ntasks = len(mapFiles)
        n_other = nReduce
    case reducePhase:
        ntasks = nReduce
        n_other = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

    var wg sync.WaitGroup
    taskChan := make(chan int)
    go func() {
        for t := 0; t < ntasks; t++ {
            wg.Add(1)
            taskChan <- t
        }
        wg.Wait()  // 直至ntasks个任务全完成
        close(taskChan)
    }()

    for task := range taskChan {
        addr := <-registerChan  // 取一空闲worker
        go func(j int, addr string) {  // 通过RPC调用执行任务
            result := call(addr, "Worker.DoTask", DoTaskArgs{jobName, mapFiles[j], phase, j, n_other}, nil)
            fmt.Println("46 current: ", j)
            if result {
                wg.Done()
            } else {
                taskChan <- j  // 该任务失败,需再次放入taskChan以便重新执行,直至成功
            }
            registerChan <- addr  // 执行完任务的worker放回registerChan,以继续执行其他任务 
        }(task, addr)
    }
    wg.Wait()

    fmt.Printf("Schedule: %v done\n", phase)
}

重点:使用goroutine+channel+waitgroup进行协程同步和通信

上一篇 下一篇

猜你喜欢

热点阅读