1. 程式人生 > >MapReduce-大規模資料集分散式並行運算程式設計模型

MapReduce-大規模資料集分散式並行運算程式設計模型

    本文轉載自CSDN部落格,純為技術資料備份!

    MapReduce的名字源於函數語言程式設計模型中的兩項核心操作:MapReduce。也許熟悉Functional Programming(FP)的人見到這兩個詞會倍感親切。因為MapReduce這兩個術語源自Lisp語言和函數語言程式設計。Map是把一組資料一對一的對映為另外的一組資料,其對映的規則由一個函式來指定。Reduce是對一組資料進行歸約,這個歸約的規則由一個函式指定。Map是一個把資料分開的過程,Reduce則是把分開的資料合併的過程。如Hadoopwordcount例子:用Map[one,word,one,dream]

進行對映就變成了[{one,1}, {word,1}, {one,1}, {dream,1}],再用Reduce[{one,1}, {word,1}, {one,1}, {dream,1}]歸約變成[{one,2}, {word,1}, {dream,1}]的結果集。

    對數組裡的每個元素進行相同操作的一段程式碼:

a = [1, 2, 3];
for (i = 0; i < a.length; i++){
    a[i] = a[i] * 2;
}
for (i = 0; i < a.length; i++){
    print(a[i]);
}

   常常要對數組裡的所有元素做同一件事情,因此你可以寫個這樣的函式:   

function map(fn, a){
     for (i = 0; i < a.length; i++){
             fn(a[i]);
    }
}

   現在可以把上面的東西改成:

map(function(x) { return x * 2; }, a);
map(print, a);
 

    另一個常見的任務是將陣列內的所有元素按照某種方式彙總起來:

function sum(a){
    s = 0;
    for (i = 0; i < a.length; i++){
        s += a[i];
    }
    return s;
}

function join(a){
    s = "";
    for (i = 0; i < a.length; i++){
         s = s .. a[i]; // ..是字串連線操作符
    }
    return s;
}

print(sum([1,2,3]));
print(join(["a","b","c"]));

    注意sumjoin長得很像,你也許想把它們抽象為一個將陣列內的所有元素按某種演算法彙總起來的泛型函式:

function reduce(fn, a, init){
   s = init;
   for (i = 0; i < a.length; i++){
      s = fn(s, a[i]);
   }
   return s;
}

    這樣sumjoin就變成下面的樣子了:

function sum(a){
    return reduce(function(a, b) { return a + b; }, a, 0 );
}

function join(a){
    return reduce(function(a, b) { return a .. b; }, a, "" );
}
 

    讓我們看回map函式。當你要對陣列內的每個元素做一些事,你很可能不在乎哪個元素先做。無論由第一個元素開始執行,還是是由最後一個元素開始執行,你的結果都是一樣的。這樣如果你手頭上有2個CPU,你可以寫段程式碼,使它們各自處理1/2元素,於是乎map快了兩倍。設想你在全球有千千萬萬臺伺服器,恰好你有一個真的很大很大的陣列,現在你可以在幾千臺伺服器上同時執行map,讓每臺伺服器都來解決同一個問題的一小部分。

     Map的定義:

Map, written by the user, takes an input pair and produces a set of intermediate key/value pairs. The MapReduce library groups together all intermediate values associated with the same intermediate key I and passes them to the Reduce function.

     Reduce的定義:

The Reduce function, also written by the user, accepts an intermediate key I and a set of values for that key. It merges together these values to form a possibly smaller set of values. Typically just zero or one output value is produced per Reduce invocation. The intermediate values are supplied to the user’s reduce function via an iterator. This allows us to handle lists of values that are too large to fit in memory.

    MapReduce論文中給出了這樣一個例子:在一個文件集合中統計每個單詞出現的次數。Map操作的輸入是每一篇文件,將輸入文件中每一個單詞的出現輸出到中間檔案中去。

map(String key, String value):
    // key: document name
    // value: document contents
    for each word w in value:
            EmitIntermediate(w, "1");

    比如我們有兩篇文件,內容分別是

        A - "I love programming"

        B - "I am a blogger, you are also a blogger"

    A文件經過Map運算後輸出的中間檔案將會是:

        I,1

        love,1

        programming,1

    B文件經過Map運算後輸出的中間檔案將會是:

        I,1

       am,1

       a,1

       blogger,1

       you,1

       are,1

       also,1

       a,1

       blogger,1

    Reduce操作的輸入是單詞和出現次數的序列。用上面的例子來說,就是 (I, [1, 1]), (love, [1]), (programming, [1]), (am, [1]), (a, [1,1]) 等。然後根據每個單詞,算出總的出現次數。

reduce(String key, Iterator values):
    // key: a word
    // values: a list of counts
    int result = 0;
    for each v in values:
        result += ParseInt(v);
    Emit(AsString(result));

    最終結果是:("I", "2"), ("a", "2"), ……

    實際的執行順序是:

        MapReduce LibraryInput分成M份。這裡的Input Splitter也可以是多臺機器並行Split

        MasterMJob分給Idle狀態的Mworker來處理;

        對於輸入中的每一個<key, value> pair 進行Map操作,將中間結果BufferMemory裡;

        定期的(或者根據記憶體狀態),將Buffer中的中間資訊Dump到本地磁碟上,並且把檔案資訊傳回給MasterMaster需要把這些資訊傳送給Reduce worker)。這裡最重要的一點是,在寫磁碟的時候,需要將中間檔案做Partition(比如R個)。拿上面的例子來舉例,如果把所有的資訊存到一個檔案,Reduce worker又會變成瓶頸。我們只需要保證相同Key能出現在同一個Partition裡面就可以把這個問題分解。

         RReduce worker開始工作,從不同的Map workerPartition那裡拿到資料(read the buffered data from the local disks of the map workers),用key進行排序(如果記憶體中放不下需要用到外部排序 – external sort)。很顯然,排序(或者說Group)是Reduce函式之前必須做的一步。 這裡面很關鍵的是,每個Reduce worker會去從很多Map worker那裡拿到X(0<X<R) Partition的中間結果,這樣,所有屬於這個Key的資訊已經都在這個worker上了。

         Reduce worker遍歷中間資料,對每一個唯一Key,執行Reduce函式(引數是這個key以及相對應的一系列Value)。

         執行完畢後,喚醒使用者程式,返回結果(最後應該有ROutput,每個Reduce Worker一個)。

     可見,這裡的分(Divide)體現在兩步,分別是將輸入分成M份,以及將Map的中間結果分成R份。將輸入分開通常很簡單,Map的中間結果通常用”hash(key) mod R”這個結果作為標準,保證相同的Key出現在同一個Partition裡面。當然,使用者也可以指定自己的Partition Function,比如,對於Url Key,如果希望同一個HostURL出現在同一個Partition,可以用”hash(Hostname(urlkey)) mod R”作為Partition Function

     對於上面的例子來說,每個文件中都可能會出現成千上萬的 ("the", 1)這樣的中間結果,瑣碎的中間檔案必然導致傳輸上的損失。因此,MapReduce還支援使用者提供Combiner Function。這個函式通常與Reduce Function有相同的實現,不同點在於Reduce函式的輸出是最終結果,而Combiner函式的輸出是Reduce函式的某一個輸入的中間檔案。