1. 程式人生 > >[分散式系統學習] 6.824 LEC1 MapReduce 筆記

[分散式系統學習] 6.824 LEC1 MapReduce 筆記

什麼是Map-Reduce呢?

Map指的是一個形如下面定義的函式。

def Map(k, v): //return [(k1, v1), (k2, v2), (k3, v3), ...]
    pass

它接受一個key和一個value,返回一組所謂的中間值。注意,返回的不是一個dict,所以k1可能等於k2。

Reduce指的是一個形如下面定義的函式。

def Reduce(k, [v1, v2, v3, ....])://return v
     pass

它接受一個key和該key對應的所有在Map函式中返回的value,返回k值的最終結果。所謂Reduce,就是把多個值“消減”為一個。

那麼Map-Reduce作為一個框架,通常是這樣執行的。

  • 多個節點在分別,獨立地執行Map函式。

Map1 ----> (k1, v1), (k2, v2), (k4, v4), (k8, v8)

Map2 ----> (k1, v9), (k4, v10)

Map3 ----> (k8, v11)

  • 然後,多個節點分別,獨立地執行Reduce函式。

Reduce1 (k1, [v1, v9])     ----> (k1, <some value>)

Reduce2 (k2, [v2])           ----> (k2, <some value>)

Reduce3 (k4, [v4, v10])   ---->  (k4, <some value>)

Reduce4 (k8, [v8, v11])   ---->  (k8, <some value>)

框架保證了Map們獨立地跑,並且每個Reduce接收到的引數,都是經過“總結“不同map的返回值得到的。程式設計師只需要實現自己的Map和Reduce函式就行了。一個典型的應用就是word count。

Map-Reduce有很多優點,主要就是節點之間在執行函式的過程中並不要通訊,而且有很好的擴充套件。

Google的Map-Reduce實現

有幾個要點。

1. 輸入和輸出以檔案的形式放在GFS上。這個檔案系統有個特點,雖然是分散式系統,但是寫檔案和建立新檔案是原子操作。

2. 輸入的所有檔案被分成M份,存放在GFS上。

3. 一個Master節點控制M個worker執行Map操作。v = 其中的一個檔案內容。

4. Map操作完成的結果被存放在本地磁碟上,並將存放位置告訴Master

5. 完成所有Map操作以後,Master並行地執行R個獨立的Reduce操作。

6. Map生成的中間值的k傳給了,第 hash(k) % R個Reduce worker操作。

容錯

1. Master週期性ping 所有worker,如果失敗,新的閒置worker將重新執行失敗的worker上的Map/Reduce操作。

2. Map/Reduce函式都是可以重入的,他們的中間結果,不會被其他節點看見,除非完成。

3. Worker的數目小於需要執行的M或者R

Lab1

這個課程所有Lab都是用Go語言實現的。答案用的Go 1.7,我用了1.8,發現有一些不一樣,待會說。

首先git clone這個專案。

git clone git://g.csail.mit.edu/6.824-golabs-2017 6.824

你會看到一個src 檔案目錄,裡面包含各個Lab的子目錄。

在MapReduce這個實驗,幾乎忠實地按照論文 MapReduce: Simplified Data Processing on Large Clusters 實現了Google的MapReduce框架。完成實驗對理解論文內容幫助蠻大。

理解這個Go語言實現的框架不是太難,就不筆記了。程式碼都有。有一點在Go 1.8裡要注意。

在common_map.go和common_reduce.go中,需要寫檔案存放中間結果,comment推薦使用json格式,並使用如下程式碼。

// enc := json.NewEncoder(file)
// for key := ... {
// enc.Encode(KeyValue{key, reduceF(...)})
// }
// file.Close()

如果你和我一樣使用的1.8,並且在doReduce的實現中按照上面的方法去寫檔案,第一個task是怎麼都過不了的。為啥呢?

在Go1.8中,如果按照上面的Encode json,生成的檔案是如下的格式

[{key:value}, {key: value}, ....]

開啟master_splitmerge.go檔案,看一下merge函式,發現他在一個一個decode這些KeyValue,類似下面這樣

// deco := json.NewDecoder(file)
// for key := ... {
// deco.Decode(&kv) <---這個就不行
// }
// file.Close()

而在merge函式中的Decode 居然在遇到"["的時候報錯=_=b。不知道是不是encoder和decoder在1.8被deprecate,否則以同樣方式寫卻沒辦法以同樣方式讀。

建議改用json.marshal/unmarshal

Part I: Map/Reduce input and output

這個就是讓你實現doMap和doReduce。注意不是Map和Reduce函式,而是call 客戶實現的Map和Reduce函式的邏輯。

首先是doMap。

該函式讀取某一個輸入檔案的內容,然後呼叫客戶實現的Map函式,再把返回值寫到檔案中。下面是我實現的程式碼部分。

bytes, err := ioutil.ReadFile(inFile)
if err != nil {
fmt.Errorf("doMap failed to read file %s due to %s", inFile, err)
return
}
contents := string(bytes)
kvs := mapF(inFile, contents)

if len(kvs) == 0 {
return
}

sort.Slice(kvs, func(i, j int) bool {
return ihash(kvs[i].Key) % nReduce < ihash(kvs[j].Key) % nReduce
})

var pr int = -1
var pkvs []KeyValue
for _, kv := range kvs {
r := ihash(kv.Key) % nReduce
if r != pr {
writeOutKeyValueArrays(jobName, mapTaskNumber, pr, pkvs)
pr = r
pkvs = nil
}
pkvs = append(pkvs, kv)
}
//write last one
writeOutKeyValueArrays(jobName, mapTaskNumber, pr, pkvs)

mapF就是客戶的業務邏輯,注意到它返回一個(k, v)的list。這個中間值需要放到檔案中。不過放檔案,按照規則,需要根據k的hash值,分別放到R個(nReduce個)檔案中。由於一共有M個doMap執行,實際上生成的中間檔案有M x R個。這個框架生成的中間檔案類似這樣的格式。

mrtmp.xxx-0-0
mrtmp.xxx-0-1
mrtmp.xxx-0-2
mrtmp.xxx-1-0
mrtmp.xxx-1-1
mrtmp.xxx-1-2

這表示2 (M)X 3(R)

為了節省時間,避免頻繁開啟關閉檔案,我們可以先按照k的hash值排序一下,然後依次開啟檔案存放它們。

再看一下doReduce函式。一共R個doReduce函式的執行,第r個會讀取mrtmp.xxx-*-r 檔案。例如,第1個doReduce要讀的檔案就是mrtmp.xxx-0-1和mrtmp.xxx-1-1。它會讀取這些檔案中產生的中間值,然後合併同樣的key所對應的value們,並傳入到客戶定義的Reduce函式中。將返回值寫到結果檔案中。

var aOfKV []KeyValue
for i := 0; i < nMap; i++ {
var kvs []KeyValue
inFilename := reduceName(jobName, i, reduceTaskNumber)
inFile, e := ioutil.ReadFile(inFilename)
if e != nil {
fmt.Errorf("Failed to read file %s due to %v", inFilename, e)
continue
}
json.Unmarshal(inFile, &kvs)
aOfKV = append(aOfKV, kvs...)
}
sort.Slice(aOfKV, func(i, j int) bool {
return aOfKV[i].Key < aOfKV[j].Key
})

//end
aOfKV = append(aOfKV, KeyValue{"", ""})

var curKey string
var curValues []string
var outputKVS []KeyValue
for _, kv := range aOfKV {
if kv.Key != curKey {
if len(curKey) > 0 {
outputStr := reduceF(curKey, curValues)
outputKVS = append(outputKVS, KeyValue{curKey, outputStr})
}
curKey = kv.Key
curValues = nil
}
curValues = append(curValues, kv.Value)
}
marshalledKVs, _ := json.Marshal(outputKVS)
err := ioutil.WriteFile(outFile, marshalledKVs, 0644)
if err != nil {
log.Fatal("Failed to write reduce file:", err)
}

注意這裡使用了類似的doMap的邏輯:先排序,再依次合併。所以在所有doReduce執行完成以後,應該有R個最後輸出檔案,然後框架會呼叫merge函式做合併。

Part II: Single-worker word count

讓你實現word count,沒啥說的。就是看一下strings裡面的FieldsFunc。

在Map函式中,檔案內容都傳給你了,要做的就是份成一個詞一個詞。

func mapWF(filename string, contents string) []mapreduce.KeyValue {
f := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
var kvs []mapreduce.KeyValue
for _, w := range strings.FieldsFunc(contents, f) {
kvs = append(kvs, mapreduce.KeyValue{w, ""})
}
return kvs
}

在Reduce函式中,傳進來已經是所有的詞的出現了。看一下value 的len好了。

func mapWF(filename string, contents string) []mapreduce.KeyValue {
f := func(c rune) bool {
return !unicode.IsLetter(c) && !unicode.IsNumber(c)
}
var kvs []mapreduce.KeyValue
for _, w := range strings.FieldsFunc(contents, f) {
kvs = append(kvs, mapreduce.KeyValue{w, ""})
}
return kvs
}

Part III: Distributing MapReduce tasks/Part IV: Handling worker failures

我們之前實現的doMap和doReduce都是序列地執行的,也就是,每個Map依次執行,完成以後,再依次執行Reduce。這個從原始碼裡面的master.go的Sequential函式很容易看出來。

在該函式裡面,對每一個input file,都依次呼叫doMap,在reducePhase,則是把中間結果(就是上面的 mrtmp.xxx-*-*)依次傳給doReduce。

// Sequential runs map and reduce tasks sequentially, waiting for each task to
// complete before running the next.
func Sequential(jobName string, files []string, nreduce int,
mapF func(string, string) []KeyValue,
reduceF func(string, []string) string,
) (mr *Master) {
mr = newMaster("master")
go mr.run(jobName, files, nreduce, func(phase jobPhase) {
switch phase {
case mapPhase:
for i, f := range mr.files {
doMap(mr.jobName, i, f, mr.nReduce, mapF)
}
case reducePhase:
for i := 0; i < mr.nReduce; i++ {
doReduce(mr.jobName, i, mergeName(mr.jobName, i), len(mr.files), reduceF)
}
}
}, func() {
mr.stats = []int{len(files) + nreduce}
})
return
}

而這個任務,就是要把序列執行的Map和Reduce,變成並行的。當然,整個Map-Reduce還是執行在單機上的,只不過用了go裡面的RPC來模擬論文中master和worker之間的通訊。我們可以看看框架已經為我們做了什麼。

// Distributed schedules map and reduce tasks on workers that register with the
// master over RPC.
func Distributed(jobName string, files []string, nreduce int, master string) (mr *Master) {
mr = newMaster(master)
mr.startRPCServer()
go mr.run(jobName, files, nreduce,
func(phase jobPhase) {
ch := make(chan string)
go mr.forwardRegistrations(ch)
schedule(mr.jobName, mr.files, mr.nReduce, phase, ch)
},
func() {
mr.stats = mr.killWorkers()
mr.stopRPCServer()
})
return
}

首先,master啟動一個RPC server。在這個master server中,它會暴露一個叫做”Register“的函式。這個Register函式的主要作用就是接收worker的Register(好廢話),把worker(它其實也是一個RPC server)的RPC地址,通過呼叫RPC 呼叫master上的Register,放到master的一個list裡面。

// Register is an RPC method that is called by workers after they have started
// up to report that they are ready to receive tasks.
func (mr *Master) Register(args *RegisterArgs, _ *struct{}) error {
mr.Lock()
defer mr.Unlock()
debug("Register: worker %s\n", args.Worker)
mr.workers = append(mr.workers, args.Worker)

// tell forwardRegistrations() that there's a new workers[] entry.
mr.newCond.Broadcast()

return nil
}

接下來,forwardRegistrations把master中的registered worker從list中提取出來,依次傳送到channel。

而我們要實現的具體程式碼在schedule.go裡面。其中schedule函式需要完成指定數目的task。指定數目指的是,如果是做的Map 階段,那指定數目就是輸入檔案的數目M;如果是在Reduce階段,就是R。那麼schedule應該有下面的步驟:

  1. 如果指定數目的task還沒有完成那麼 從channel中提取一個註冊的worker RPC的地址;否則3。
  2. 非同步呼叫該worker上的DoTask方法。 在DoTask 方法中,已經有對doMap和doReduce的呼叫的邏輯,DoTask成功返回以後,重新把worker放回到channel,以待重用。goto 1。
  3. 等待所有task完成後退出。

在2中,有一個限制,我們不能給同一個worker RPC 地址傳送多次DoTask請求。要實現這個,我們可以在把worker從channel提取出來以後,不放回去,然後待到DoTask成功返回後,才放入待重用。

另外,在Task 4中,當傳送一個DoTask請求以後,該請求可能超時或者失敗。這時候需要重試。我們的想法是在在第二中的非同步呼叫邏輯裡面,如果DoTask 返回失敗,不把worker重新放入channel,而是直接丟棄。再從channel中取一個worker。

func schedule(jobName string, mapFiles []string, nReduce int, phase jobPhase, registerChan chan string) {
var ntasks int
var n_other int // number of inputs (for reduce) or outputs (for map)
switch phase {
case mapPhase:
ntasks = len(mapFiles)
n_other = nReduce
case reducePhase:
ntasks = nReduce
n_other = len(mapFiles)
}

fmt.Printf("Schedule: %v %v tasks (%d I/Os)\n", ntasks, phase, n_other)

// All ntasks tasks have to be scheduled on workers, and only once all of
// them have been completed successfully should the function return.
// Remember that workers may fail, and that any given worker may finish
// multiple tasks.
//
doneCh := make(chan int)
doneCnt := 0
var mux sync.Mutex
for nt := 0; nt < ntasks; nt++ {
go func(fileIndex int) {
for {
workerName := <-registerChan
ok := call(workerName,
"Worker.DoTask",
DoTaskArgs{jobName, mapFiles[fileIndex], phase, fileIndex, n_other},
nil)
if ok == false {
fmt.Printf("Failed to RPC call %v at %d, retrying\n", workerName, nt)
continue
}
mux.Lock()
doneCnt += 1
if doneCnt == ntasks {
doneCh <- 1
}
mux.Unlock()
registerChan <- workerName
break
}

} (nt)
}
<-doneCh
fmt.Printf("Schedule: %v phase done\n", phase)
}

注意這個nt一定要傳到goroutine裡面去,而不是作為一個閉包變數,因為它在迴圈中會變。

Part V: Inverted index generation (optional for extra credit)

這又是一個Map-Reduce應用。是讓你看某個單詞在哪些文件裡面出現過。

同樣的,在Map函式中分詞,詞作為key,而value是文件名字。

而Reduce函式中,僅僅是把文件列出來,配上文件數目。程式碼比較簡單就不貼了。

第一次做課程,覺得作業設計上很用心。