1. 程式人生 > >Hadoop的combiner學習與自定義combiner

Hadoop的combiner學習與自定義combiner

Combiner的概念

Combiner號稱本地的Reduce,Reduce的輸入是Combiner的最終輸出。

在MapReduce中,當map生成的資料過大時,頻寬就成了瓶頸,怎樣精簡壓縮傳給Reduce的資料,有不影響最終的結果呢。有一種方法就是使用Combiner,Combiner號稱本地的Reduce。Combiner是用reducer來定義的,多數的情況下Combiner和reduce處理的是同一種邏輯,所以job.setCombinerClass()的引數可以直接使用定義的reduce,當然也可以單獨去定義一個有別於reduce的Combiner,繼承Reducer,寫法基本上定義reduce一樣。

combiner的價值定位:

眾所周知,Hadoop框架使用Mapper將資料塊處理成一個個的<key,value>鍵值對,在網路節點間對其進行洗牌整理(shuffle),然後使用Reducer處理計算結果資料並進行最終結果輸出。

  在上述過程中,我們看到至少兩個效能瓶頸:

  (1)如果我們有10億個資料,Mapper會生成10億個鍵值對在網路間進行傳輸,但如果我們只是計算資料求最大值,那麼很明顯的Mapper只需要輸出它所知道的最大值即可。準確的說是知道每個資料塊中的最大值即可(不拉取其他資料到Reducer),減小計算的基數,不僅可以減輕網路壓力,同時大幅度提高程式效率。

  總結:網路頻寬嚴重被佔降低程式效率;

  (2)假設使用美國專利資料集中的國家一項來闡述資料傾斜這個定義,這樣的資料遠遠不是一致性的或者說平衡分佈的,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程式計算的效能。

  總結:單一節點承載過重降低程式效能;

Combiner的價值就由這兩個問題體現的淋漓盡致。

Combiner詳解:

 在MapReduce程式設計模型中,在Mapper和Reducer之間有一個非常重要的元件,它解決了上述的效能瓶頸問題,它就是Combiner。

注意:

①與mapper和reducer不同的是,combiner沒有預設的實現,需要顯式的設定在conf中才有作用。

②並不是所有的job都適用combiner,只有操作滿足結合律的才可設定combiner。combine操作類似於:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。

  每一個map都可能會產生大量的本地輸出,Combiner的作用就是對map端的輸出先做一次合併,以減少在map和reduce節點之間的資料傳輸量,以提高網路IO效能,是MapReduce的一種優化手段之一,其具體的作用如下所述。

  (1)Combiner最基本是實現本地key的聚合,對map輸出的key排序,value進行迭代。如下所示:

  map: (K1, V1) → list(K2, V2) 
  combiner: (K2, list(V2)) → list(K2, V2) 
  reduce: (K2, list(V2)) → list(K3, V3)

  (2)Combiner還有本地reduce功能(其本質上就是一個reduce),例如Hadoop自帶的wordcount的例子和找出value的最大值的程式,combiner和reduce完全一致,如下所示:

  map: (K1, V1) → list(K2, V2) 
  combine: (K2, list(V2)) → list(K3, V3) 
  reduce: (K3, list(V3)) → list(K4, V4)

PS:回想一下,如果在wordcount中沒有Combiner來做先鋒,資料量上千萬的話Reduce的效率一定會低到難以想象。使用combiner之後,先完成的map會在本地聚合,為Reduce的計算做好第一次的資料聚合,提升計算效率。這樣,對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。

加入Combiner的MapReduce

  前面文章中的程式碼都忽略了一個可以優化MapReduce作業所使用頻寬的步驟—Combiner,它在Mapper之後Reducer之前執行。Combiner是可選的,如果這個過程適合於你的作業,Combiner例項會在每一個執行map任務的節點上執行。Combiner會接收特定節點上的Mapper例項的輸出作為輸入,接著Combiner的輸出會被髮送到Reducer那裡,而不是傳送Mapper的輸出。Combiner是一個“迷你reduce”過程,它只處理單臺機器生成的資料。

舉個小栗子

我們抓取去年每個季度最高溫度的資料:

第一個mapper資料(2017,[34,25])

第二個mapper資料(2017,[33,32])

如果不考慮使用Combiner的計算過程的話Reducer如下:

(2017,[34,25,33,32])//通過排序後得到最大值

如果考慮Combiner的計算過程Reducer拿到的就是精簡壓縮並處理之後的資料集了:

(2017,[34])

(2017,[33])

這兩種方法的結果是一定相同的,使用Combiner最大的好處是節省網路傳輸的資料,這對於提高整體的計算效率是非常有幫助的。當然好的方法當然是要有對口的問題做依靠,若是在不對的地方使用combiner,很有可能會適得其反,eg:求一組資料的平均值。(反例就不列舉了,若有需要可以私信或留言)

使用MyReducer作為Combiner

  在前面文章中的WordCount程式碼中加入以下一句簡單的程式碼,即可加入Combiner方法:

  // 設定Map規約Combiner
    job.setCombinerClass(MyReducer.class);

自定義的 Combiner

為了能夠更加清晰的理解Combiner的工作原理,我們自定義一個Combiners類,不再使用MyReduce做為Combiners的類,具體的程式碼下面一一道來。

改寫Mapper類的map方法

 public static class MyMapper extends
            Mapper<LongWritable, Text, Text, LongWritable> {
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            String line = value.toString();
            String[] spilted = line.split("\\s");
            for (String word : spilted) {
                context.write(new Text(word), new LongWritable(1L));
                // 為了顯示效果而輸出Mapper的輸出鍵值對資訊
                System.out.println("Mapper輸出<" + word + "," + 1 + ">");
            }
        };
    }

改寫Reducer類的reduce方法新增MyCombiner類並重寫reduce方法

public static class MyCombiner extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                Text key,
                java.lang.Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            // 顯示次數表示Combiner執行了多少次,表示k2有多少個分組
            System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");
            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
                // 顯示次數表示輸入的k2,v2的鍵值對數量
                System.out.println("Combiner輸入鍵值對<" + key.toString() + ","
                        + value.get() + ">");
            }
            context.write(key, new LongWritable(count));
            // 顯示次數表示輸出的k2,v2的鍵值對數量
            System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count
                    + ">");
        };
    }

新增MyCombiner類並重寫reduce方法

public static class MyCombiner extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                Text key,
                java.lang.Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            // 顯示次數表示規約函式被呼叫了多少次,表示k2有多少個分組
            System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");
            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
                // 顯示次數表示輸入的k2,v2的鍵值對數量
                System.out.println("Combiner輸入鍵值對<" + key.toString() + ","
                        + value.get() + ">");
            }
            context.write(key, new LongWritable(count));
            // 顯示次數表示輸出的k2,v2的鍵值對數量
            System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count
                    + ">");
        };
    }

 

新增設定Combiner的程式碼

 

    // 設定Map規約Combiner類(函式)
    job.setCombinerClass(MyCombiner.class);

除錯執行的控制檯列印資訊

  (1)Mapper時期

Mapper輸出<hello,1>
Mapper輸出<edison,1>
Mapper輸出<hello,1>
Mapper輸出<kevin,1>

  (2)Combiner時期

 

Combiner輸入分組<edison,N(N>=1)>
Combiner輸入鍵值對<edison,1>
Combiner輸出鍵值對<edison,1>
Combiner輸入分組<hello,N(N>=1)>
Combiner輸入鍵值對<hello,1>
Combiner輸入鍵值對<hello,1>
Combiner輸出鍵值對<hello,2>
Combiner輸入分組<kevin,N(N>=1)>
Combiner輸入鍵值對<kevin,1>
Combiner輸出鍵值對<kevin,1>

  這裡可以看出,在Combiner中進行了一次本地的Reduce操作,從而簡化了遠端Reduce節點的歸併壓力。

  (3)Reducer時期

Reducer輸入分組<edison,N(N>=1)>
Reducer輸入鍵值對<edison,1>
Reducer輸入分組<hello,N(N>=1)>
Reducer輸入鍵值對<hello,2>
Reducer輸入分組<kevin,N(N>=1)>
Reducer輸入鍵值對<kevin,1>

  這裡可以看出,在對hello的歸併計算一次,只進行一次操作就完成了。

 

  那麼,如果我們再來看看不新增Combiner時的控制檯輸出資訊:

  (1)Mapper

Mapper輸出<hello,1>
Mapper輸出<edison,1>
Mapper輸出<hello,1>
Mapper輸出<kevin,1>

  (2)Reducer

Reducer輸入分組<edison,N(N>=1)>
Reducer輸入鍵值對<edison,1>
Reducer輸入分組<hello,N(N>=1)>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入分組<kevin,N(N>=1)>
Reducer輸入鍵值對<kevin,1>

  可以看出,沒有采用Combiner時hello都是由Reducer節點來進行統一的歸併,也就是這裡為何會有兩次hello的輸入鍵值對了。

總結:從控制檯的輸出資訊我們可以發現,其實combine只是把兩個相同的hello進行規約,由此輸入給reduce的就變成了<hello,2>。在實際的Hadoop叢集操作中,我們是由多臺主機一起進行MapReduce的,如果加入規約操作,每一臺主機會在reduce之前進行一次對本機資料的規約,然後在通過叢集進行reduce操作,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。

參考: https://blog.csdn.net/deguotiantang/article/details/58586972