1. 程式人生 > >MIT 6.824 lab1:mapreduce

MIT 6.824 lab1:mapreduce

line function not eating them 鏈接 collected rpc ner

這是 MIT 6.824 課程 lab1 的學習總結,記錄我在學習過程中的收獲和踩的坑。

我的實驗環境是 windows 10,所以對lab的code 做了一些環境上的修改,如果你僅僅對code 感興趣,請移步 : github/zouzhitao

mapreduce overview

先大致看一下 mapreduce 到底是什麽

我個人的簡單理解是這樣的: mapreduce 就是一種分布式處理用戶特定任務的系統。它大概是這樣處理的。

用戶提供兩個函數

mapFunc(k1,v1)-> list(k2,v2)
reduceFunc(k2,list(v2)) -> ans of k2

這個 分布式系統 將用戶的任務做分布式處理,最終為每一個 k2 生成答案。下面我們就來描述一下,這個分布式系統是如何處理的。

首先,他有一個 master 來做任務調度。

master

  1. 先調度 worker 做 map 任務,設總的 map 任務的數目為 $M$ , 將result 存儲在 中間文件 m-i-j 中, $i \in {0,\dots ,M-1}, j \in {0,\dots,R-1}$
  2. 調度 worker 做 reduce 任務,設總的 reduce 任務數目為 $R$, 將答案儲存在 $r_j$
  3. 然後將所有的renduce 任務的ans merge起來作為答案放在一個文件中交給用戶。

detail 都在實驗中

detail

這部分講 實驗內容(觀看code), 不過不按照 lab 順序將。個人認為 做lab的目的,不是做lab 而是為了搞懂 mapreduce system

master

我們先來看看 master 這部分的代碼

// Master holds all the state that the master needs to keep track of.
type Master struct {
    sync.Mutex

    address     string
    doneChannel chan bool

    // protected by the mutex
    newCond *sync.Cond // signals when Register() adds to workers[]
    workers []string   // each worker's UNIX-domain socket name -- its RPC address

    // Per-task information
    jobName string   // Name of currently executing job
    files   []string // Input files
    nReduce int      // Number of reduce partitions

    shutdown chan struct{}
    l        net.Listener
    stats    []int
}

master 維護了執行一個 job 需要的所有狀態

master.run

這部分是 master 具體做的事情

// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
    mr = newMaster(master)
    mr.startRPCServer()
    go mr.run(jobName, files, nreduce,
        func(phase jobPhase) {
            ch := make(chan string) // worker 的地址
            go mr.forwardRegistrations(ch)
            schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
        },
        func() {
            mr.stats = mr.killWorkers()
            mr.stopRPCServer()
        })
    return
}

// run executes a mapreduce job on the given number of mappers and reducers.
//
// First, it divides up the input file among the given number of mappers, and
// schedules each task on workers as they become available. Each map task bins
// its output in a number of bins equal to the given number of reduce tasks.
// Once all the mappers have finished, workers are assigned reduce tasks.
//
// When all tasks have been completed, the reducer outputs are merged,
// statistics are collected, and the master is shut down.
//
// Note that this implementation assumes a shared file system.
func (mr *Master) run(jobName string, files []string, nreduce int,
    schedule func(phase jobPhase),
    finish func(),
) {
    mr.jobName = jobName
    mr.files = files
    mr.nReduce = nreduce

    fmt.Printf("%s: Starting Map/Reduce task %s\n", mr.address, mr.jobName)

    schedule(mapPhase)
    schedule(reducePhase)
    finish()
    mr.merge()

    fmt.Printf("%s: Map/Reduce task completed\n", mr.address)

    mr.doneChannel <- true
}

schedule

我們需要實現的其實是這個 schedule 也是最核心的, schedule 實現任務調度,註意這裏有 $M$ 個 map 任務,$R$ 個 reduce 任務,只有 $n$ 個 worker, 通常情況下,$M>n,R>n$ 這樣才能盡可能利用 worker 的性能,讓流水線充沛。

//
// schedule() starts and waits for all tasks in the given phase (mapPhase
// or reducePhase). the mapFiles argument holds the names of the files that
// are the inputs to the map phase, one per map task. nReduce is the
// number of reduce tasks. the registerChan argument yields a stream
// of registered workers; each item is the worker's RPC address,
// suitable for passing to call(). registerChan will yield all
// existing registered workers (if any) and new ones as they register.
//
func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
    var ntasks int
    var nOther int // number of inputs (for reduce) or outputs (for map)
    switch phase {
    case mapPhase:
        ntasks = len(mapFiles)
        nOther = nReduce
    case reducePhase:
        ntasks = nReduce
        nOther = len(mapFiles)
    }

    fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, nOther)

    // All ntasks tasks have to be scheduled on workers. Once all tasks
    // have completed successfully, schedule() should return.
    //
    // Your code here (Part III, Part IV).
    //

    //Part III

    var wg sync.WaitGroup
    wg.Add(ntasks)
    for i := 0; i < ntasks; i++ {
        go func(i int) {
            defer wg.Done()
            filename := ""
            if i <= len(mapFiles) {
                filename = mapFiles[i]
            }
            taskArgs := DoTaskArgs{
                JobName:       jobName,
                File:          filename,
                Phase:         phase,
                TaskNumber:    i,
                NumOtherPhase: nOther,
            }

            taskFinished := false

            for taskFinished == false {
                workAddr := <-registerChan
                taskFinished = call(workAddr, "Worker.DoTask", taskArgs, nil)
                go func() { registerChan <- workAddr }()
            }
        }(i)

    }
    wg.Wait()
    fmt.Printf("Schedule: %v done\n", phase)
}

schedule 要做的事情就是對於每一個任務,調用 call 函數去執行 一個rpc調用,讓 worker 執行 Worker.DoTask 這是 PART III/IV 的代碼。

這裏註意幾點細節

  1. registerChan 用的是管道,傳輸可用worker 的地址,所以 執行完一個 task之後要將 worker 的地址重新放到 registerChan
  2. master 是串行調度的,也就是說他要等待所有 map 任務做完,才會調度 reduce 任務,所以在schedule 裏不能提前返回,要等待 說有task完成

接下來我們來看看這個 call 到底幹了什麽,其實它調用了 worker.DOTASK, 所以我們簡單看看 worker.Dotask 幹了什麽就好

worker

// DoTask is called by the master when a new task is being scheduled on this
// worker.
func (wk *Worker) DoTask(arg *DoTaskArgs, _ *struct{}) error {
    //...
    switch arg.Phase {
    case mapPhase:
        doMap(arg.JobName, arg.TaskNumber, arg.File, arg.NumOtherPhase, wk.Map)
    case reducePhase:
        doReduce(arg.JobName, arg.TaskNumber, mergeName(arg.JobName, arg.TaskNumber), arg.NumOtherPhase, wk.Reduce)
    }
    //....
}

它核心就是調用了 doMapdoReduce

這也是 PART 1 的類容,我們來看看 doMapdoReduce 做了什麽

doMap

func doMap(
    jobName string, // the name of the MapReduce job
    mapTask int, // which map task this is
    inFile string,
    nReduce int, // the number of reduce task that will be run ("R" in the paper)
    mapF func(filename string, contents string) []KeyValue,
) {
    //
    // doMap manages one map task: it should read one of the input files
    // (inFile), call the user-defined map function (mapF) for that file's
    // contents, and partition mapF's output into nReduce intermediate files.
    //
    // There is one intermediate file per reduce task. The file name
    // includes both the map task number and the reduce task number. Use
    // the filename generated by reduceName(jobName, mapTask, r)
    // as the intermediate file for reduce task r. Call ihash() (see
    // below) on each key, mod nReduce, to pick r for a key/value pair.
    //
    // mapF() is the map function provided by the application. The first
    // argument should be the input file name, though the map function
    // typically ignores it. The second argument should be the entire
    // input file contents. mapF() returns a slice containing the
    // key/value pairs for reduce; see common.go for the definition of
    // KeyValue.
    //
    // Look at Go's ioutil and os packages for functions to read
    // and write files.
    //
    // Coming up with a scheme for how to format the key/value pairs on
    // disk can be tricky, especially when taking into account that both
    // keys and values could contain newlines, quotes, and any other
    // character you can think of.
    //
    // One format often used for serializing data to a byte stream that the
    // other end can correctly reconstruct is JSON. You are not required to
    // use JSON, but as the output of the reduce tasks *must* be JSON,
    // familiarizing yourself with it here may prove useful. You can write
    // out a data structure as a JSON string to a file using the commented
    // code below. The corresponding decoding functions can be found in
    // common_reduce.go.
    //
    //   enc := json.NewEncoder(file)
    //   for _, kv := ... {
    //     err := enc.Encode(&kv)
    //
    // Remember to close the file after you have written all the values!
    //
    // Your code here (Part I).
    //

    content := safeReadFile(inFile)
    ans := mapF(inFile, string(content))
    jsonEncoder := make([]*json.Encoder, nReduce)

    for i := 0; i < nReduce; i++ {
        f := safeCreaFile(reduceName(jobName, mapTask, i))
        jsonEncoder[i] = json.NewEncoder(f)
        defer f.Close()
    }
    for _, kv := range ans {
        r := ihash(kv.Key) % nReduce
        err := jsonEncoder[r].Encode(&kv)
        if err != nil {
            log.Fatal("jsonEncode err", err)
        }
    }
}
  1. 讀取文件內容
  2. 調用用戶的 mapF 生成一系列的 key/val 將所有的 key/val list 以key hash 到每個 reduce 文件中
    也就是說,每個 map 任務產生 $nReduce$ 個中間文件,因此總共有 MxR 個中間文件產生,同時 由於 是以key hash 到reduce 任務的,可以保證同樣的 key 一定到同一個 reduce

reduce

func doReduce(
    jobName string, // the name of the whole MapReduce job
    reduceTask int, // which reduce task this is
    outFile string, // write the output here
    nMap int, // the number of map tasks that were run ("M" in the paper)
    reduceF func(key string, values []string) string,
) {
    //
    // doReduce manages one reduce task: it should read the intermediate
    // files for the task, sort the intermediate key/value pairs by key,
    // call the user-defined reduce function (reduceF) for each key, and
    // write reduceF's output to disk.
    //
    // You'll need to read one intermediate file from each map task;
    // reduceName(jobName, m, reduceTask) yields the file
    // name from map task m.
    //
    // Your doMap() encoded the key/value pairs in the intermediate
    // files, so you will need to decode them. If you used JSON, you can
    // read and decode by creating a decoder and repeatedly calling
    // .Decode(&kv) on it until it returns an error.
    //
    // You may find the first example in the golang sort package
    // documentation useful.
    //
    // reduceF() is the application's reduce function. You should
    // call it once per distinct key, with a slice of all the values
    // for that key. reduceF() returns the reduced value for that key.
    //
    // You should write the reduce output as JSON encoded KeyValue
    // objects to the file named outFile. We require you to use JSON
    // because that is what the merger than combines the output
    // from all the reduce tasks expects. There is nothing special about
    // JSON -- it is just the marshalling format we chose to use. Your
    // output code will look something like this:
    //
    // enc := json.NewEncoder(file)
    // for key := ... {
    //  enc.Encode(KeyValue{key, reduceF(...)})
    // }
    // file.Close()
    //
    // Your code here (Part I).
    //

    kvs := make(map[string][]string)
    for i := 0; i < nMap; i++ {
        kv := jsonDecode(reduceName(jobName, i, reduceTask))
        for _, v := range kv {
            kvs[v.Key] = append(kvs[v.Key], v.Value)
        }
    }
    f := safeCreaFile(outFile)
    defer f.Close()
    enc := json.NewEncoder(f)
    for k, v := range kvs {
        reduceAns := reduceF(k, v)
        enc.Encode(KeyValue{k, reduceAns})
    }
}

reduce 幹的事情也很簡單,它先讀取所有傳給它的任務。做成一個 list of key/val

然後調用用戶的 reduceF。將答案傳給用json 編碼到一個文件

PART I 完。

接下來是兩個實例

example

這裏的兩個例子是 word count 和倒排索引 invert index

word count

這個任務,是統計每個單詞出現的次數

//
// The map function is called once for each file of input. The first
// argument is the name of the input file, and the second is the
// file's complete contents. You should ignore the input file name,
// and look only at the contents argument. The return value is a slice
// of key/value pairs.
//
func mapF(filename string, contents string) []mapreduce.KeyValue {
    // Your code here (Part II).
    var ret []mapreduce.KeyValue
    words := strings.FieldsFunc(contents, func(x rune) bool {
        return unicode.IsLetter(x) == false
    })
    for _, w := range words {
        kv := mapreduce.KeyValue{w, ""}
        ret = append(ret, kv)
    }
    return ret
}

//
// The reduce function is called once for each key generated by the
// map tasks, with a list of all the values created for that key by
// any map task.
//
func reduceF(key string, values []string) string {
    // Your code here (Part II).
    return strconv.Itoa(len(values))
}

part II 完

這裏有一點要註意, test 用的是 diff,這個比對會將 \n,\n\r 認成不一樣的,註意將ans 中的東西改成 \n 就好。

invert index

// The mapping function is called once for each piece of the input.
// In this framework, the key is the name of the file that is being processed,
// and the value is the file's contents. The return value should be a slice of
// key/value pairs, each represented by a mapreduce.KeyValue.
func mapF(document string, value string) (res []mapreduce.KeyValue) {
    // Your code here (Part V).
    words := strings.FieldsFunc(value, func(x rune) bool {
        return unicode.IsLetter(x) == false
    })
    kvmap := make(map[string]string)
    for _, w := range words {
        kvmap[w] = document
    }
    for k, v := range kvmap {
        res = append(res, mapreduce.KeyValue{k, v})
    }
    return
}

// The reduce function is called once for each key generated by Map, with a
// list of that key's string value (merged across all inputs). The return value
// should be a single output value for that key.
func reduceF(key string, values []string) string {
    // Your code here (Part V).
    numberOfDoc := len(values)
    sort.Strings(values)
    res := strconv.Itoa(numberOfDoc) + " " + strings.Join(values, ",")

    return res
}

這個地方要註意將同一個文檔中的重復單詞去除掉,用一個 map 儲存一下就好

最後說一下環境的坑點

windows 環境註意事項

  1. lab 中註冊用的unix 文件地址不能用,我將其改成了 tcp
  2. 註意改成 tcp 後,worker在 shutdown 的時候 close 掉tcp鏈接

reference

  1. google mapreduce paper
  2. lab1
  3. github/zouzhitao code repo

版權聲明

本作品為作者原創文章,采用知識共享署名-非商業性使用-相同方式共享 4.0 國際許可協議

作者: taotao

轉載請保留此版權聲明,並註明出處

MIT 6.824 lab1:mapreduce