MapReduce翻译及笔记

2019-11-04  本文已影响0人  hugo54

Execution Overview

  1. 把input files分割成M个部分。在机器集群上启动若干个程序的拷贝,其中有个特殊的拷贝叫master,其余的是worker,它们在master上注册。一共有M个map task和R个reduce task等待被分配给worker。

  2. 被分配到map task的worker,把M个input file pieces转换成Key-Value,存储在内存中。(在实际应用中,内存中的数据会被周期性地写入到硬盘中,根据partitioning function,被分配到R个区域。这些被写入磁盘的数据的位置,会被传给master。)

  3. 被分配到reduce task的worker,它们知道map task的output位置,使用RPC来获取这些output。当reduce worker获取到所有的intermediate data,就根据Key来排序,以确保相同Key的Key-Value聚集在一起。(如果内存装不下,需要采用外部排序)

  4. reduce worker遍历已排序的数据,对于每一个Key,把Key和它的Value集合作为参数传给Reduce(key string, values []string) string,Reduce函数的output会被添加到这个reduce partition最终的output里。

Types

// map    (k1, v1)       → list(k2, v2)
// reduce (k2, list(v2)) → list(v2)
​
type KeyValue struct {
    Key   string
    Value string
}
​
mapFuc(key string, value string) []KeyValue
​
// Typically just zero or one output value is produced per Reduce invocation
reduceFuc(key string, values []string) string

Example

1. Count of Word Frequency of a Large Collections of Documents

func mapFuc(filename string, contents string) []KeyValue {
    var output []KeyValue
    keys := strings.FieldsFunc(contents, func(r rune) bool {
        return !unicode.IsLetter(r) && !unicode.IsNumber(r)
    })
    for _, k := range keys {
        output = append(output, KeyValue{Key: k, Value: "1"})
    }
    return output
}

func reduceFuc(key string, values []string) string {
    num := 0
    for _, value := range values {
        i, _ := strconv.Atoi(value)
        num += i
    }
    return strconv.Itoa(num)
}

2. Distributed Grep

const pattern = "your pattern here"

// The map function emits a line if it matches a supplied pattern
func mapFuc(filename string, contents string) []KeyValue {
    var output []KeyValue
    
    // seperates file contents by line
    keys := strings.FieldsFunc(contents, func(r rune) bool {
        return r != '\n'
    })
    for _, k := range keys {
        matched, _ := regexp.Match(pattern, []byte(k))
        if (matched) {
            output = append(output, KeyValue{Key: k, Value: ""})
        }
    }
    return output
}

// The reduce function is an identity function that 
// just copies the supplied intermediate data to the output
func reduceFuc(key string, values[] string) string {
    return key
}

3. Count of URL Access Frequency

// The map function processes logs of web page requests and outputs <URL, 1>
func mapFuc(filename string, contents string) []KeyValue {
}

// The reduce function adds together all values for the same URL 
// and emits a <URL, total count> pairs
func reduceFuc(key string, values[] string) string {
}

4. Reverse Web-Link Graph

// The map function outputs <target, source> pairs for each link 
// to target URL found in a page named source
func mapFuc(filename string, contents string) []KeyValue {
}

// The reduce function concatenates the list of all source URLs 
// associated with a given target URL and emits the pair: <target, list(source)>
func reduceFuc(key string, values[] string) string {
}

5. Term-Vector per Host

/*
    A term summarizes the most important words that occur in a document 
    or a set of documents as a list of <work, frequency> pairs.
    
    What is a term vector? 
    http://www.inf.ed.ac.uk/teaching/courses/tts/pdf/vspace-2x2.pdf
*/

// The map function emits a <hostname, term vector> pair for each input document.
// (where the hostname is extracted from the URL of the document)
func mapFuc(filename string, contents string) []KeyValue {
}

// The reduce function is passed all per-document term vectors for a given host.
// It adds these term vectors together, throwing away infrequent terms, 
// and then emits a final <hostname, term vector> pair.
func reduceFuc(key string, values[] string) string {
}

6. Inverted Index

// The map function parses each document, 
// and emits a sequence of <word, document ID> pairs.
func mapFuc(filename string, contents string) []KeyValue {
    var output []KeyValue;
    words := strings.FieldsFunc(contents, func(r rune) bool {
        return !unicode.IsLetter(r) && !unicode.IsNumber(r)
    })
    for _, word := range words {
        output = append(output, KeyValue{Key: word, Value: filename})
    }
    return output
}

// The reduce function accepts all pairs for a given word, 
// sorts the corresponding document IDs and emits a <word, list(documentID)> pair.
// The set of all output pairs forms a simple inverted index.
// It is easy to augment this computation to keep track of word positions.
func reduceFuc(key string, values[] string) string {
    
}

7. Distributed Sort

// The map function extracts the key from each record, and emits a <key, record> pair.
func mapFuc(filename string, contents string) []KeyValue {
}

// The reduce function emits all pairs unchanged.
// (This computation depends on the partitioning facilties described in
// Section 4.1 and the ordering properties described in Section 4.2)
func reduceFuc(key string, values[] string) string {
}

How To Handle Worker Failures?

Master周期性ping已注册的worker,如果没有响应(不管是延迟还是真的挂了),master就认为这个worker挂了。分配给挂掉的worker的task会被重新分配给空闲的worker:

如果一个map task先后被worker A和worker B执行(因为worker A挂了),所有在执行reduce task的worker都会被告知这次re-execution。然后,所有还没读worker A产生的数据的reduce task都会从worker B读。

How To Handle Master Failure?

周期性地把master的数据写入硬盘(checkpoints),如果master挂了,就从最新的checkpoint恢复。

master挂了以后,停止MapReduce的计算,以确保一致性。

Reference

上一篇下一篇

猜你喜欢

热点阅读