1. 程式人生 > >Hadoop學習筆記—8.Combiner與自定義Combiner

Hadoop學習筆記—8.Combiner與自定義Combiner

一、Combiner的出現背景

1.1 回顧Map階段五大步驟

  在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:

map section

  其中,step1.5是一個可選步驟,它就是我們今天需要了解的 Map規約 階段。現在,我們再來看看前一篇博文《計數器與自定義計數器》中的第一張關於計數器的圖:


  我們可以發現,其中有兩個計數器:Combine output recordsCombine input records,他們的計數都是0,這是因為我們在程式碼中沒有進行Map階段的規約操作。

1.2 為什麼需要進行Map規約操作

  眾所周知,Hadoop框架使用Mapper將資料處理成一個個的<key,value>鍵值對,在網路節點間對其進行整理(shuffle),然後使用Reducer處理資料並進行最終輸出。

  在上述過程中,我們看到至少兩個效能瓶頸

  (1)如果我們有10億個資料,Mapper會生成10億個鍵值對在網路間進行傳輸,但如果我們只是對資料求最大值,那麼很明顯的Mapper只需要輸出它所知道的最大值即可。這樣做不僅可以減輕網路壓力,同樣也可以大幅度提高程式效率。

  總結:網路頻寬嚴重被佔降低程式效率;

  (2)假設使用美國專利資料集中的國家一項來闡述資料傾斜這個定義,這樣的資料遠遠不是一致性的或者說平衡分佈的,由於大多數專利的國家都屬於美國,這樣不僅Mapper中的鍵值對、中間階段(shuffle)的鍵值對等,大多數的鍵值對最終會聚集於一個單一的Reducer之上,壓倒這個Reducer,從而大大降低程式的效能。

  總結:單一節點承載過重降低程式效能;

那麼,有木有一種方案能夠解決這兩個問題呢?

二、初步探索Combiner

2.1 Combiner的橫空出世

在MapReduce程式設計模型中,在Mapper和Reducer之間有一個非常重要的元件,它解決了上述的效能瓶頸問題,它就是Combiner

PS:

①與mapper和reducer不同的是,combiner沒有預設的實現,需要顯式的設定在conf中才有作用。

②並不是所有的job都適用combiner,只有操作滿足結合律的才可設定combiner。combine操作類似於:opt(opt(1, 2, 3), opt(4, 5, 6))。如果opt為求和、求最大值的話,可以使用,但是如果是求中值的話,不適用。

  每一個map都可能會產生大量的本地輸出Combiner的作用就是對map端的輸出先做一次合併,以減少在map和reduce節點之間的資料傳輸量,以提高網路IO效能,是MapReduce的一種優化手段之一,其具體的作用如下所述。

  (1)Combiner最基本是實現本地key的聚合,對map輸出的key排序,value進行迭代。如下所示:

  map: (K1, V1) → list(K2, V2) 
  combine: (K2, list(V2)) → list(K2, V2) 
  reduce: (K2, list(V2)) → list(K3, V3)

  (2)Combiner還有本地reduce功能(其本質上就是一個reduce),例如Hadoop自帶的wordcount的例子和找出value的最大值的程式,combiner和reduce完全一致,如下所示:

  map: (K1, V1) → list(K2, V2) 
  combine: (K2, list(V2)) → list(K3, V3) 
  reduce: (K3, list(V3)) → list(K4, V4)

PS:現在想想,如果在wordcount中不用combiner,那麼所有的結果都是reduce完成,效率會相對低下。使用combiner之後,先完成的map會在本地聚合,提升速度。對於hadoop自帶的wordcount的例子,value就是一個疊加的數字,所以map一結束就可以進行reduce的value疊加,而不必要等到所有的map結束再去進行reduce的value疊加。

2.2 融合Combiner的MapReduce

  前面文章中的程式碼都忽略了一個可以優化MapReduce作業所使用頻寬的步驟—Combiner,它在Mapper之後Reducer之前執行。Combiner是可選的,如果這個過程適合於你的作業,Combiner例項會在每一個執行map任務的節點上執行。Combiner會接收特定節點上的Mapper例項的輸出作為輸入,接著Combiner的輸出會被髮送到Reducer那裡,而不是傳送Mapper的輸出。Combiner是一個“迷你reduce”過程,它只處理單臺機器生成的資料

2.3 使用MyReducer作為Combiner

在前面文章中的WordCount程式碼中加入以下一句簡單的程式碼,即可加入Combiner方法:

    // 設定Map規約Combiner
    job.setCombinerClass(MyReducer.class);

  還是以下面的檔案內容為例,看看這次計數器會發生怎樣的改變?

  (1)上傳的測試檔案的內容

hello edison
hello kevin

  (2)除錯後的計數器日誌資訊

  可以看到,原本都為0的Combine input records和Combine output records發生了改變。我們可以清楚地看到map的輸出和combine的輸入統計是一致的,而combine的輸出與reduce的輸入統計是一樣的。由此可以看出規約操作成功,而且執行在map的最後,reduce之前。

三、自己定義Combiner

  為了能夠更加清晰的理解Combiner的工作原理,我們自定義一個Combiners類,不再使用MyReduce做為Combiners的類,具體的程式碼下面一一道來。

3.1 改寫Mapper類的map方法

  public static class MyMapper extends
            Mapper<LongWritable, Text, Text, LongWritable> {
        protected void map(LongWritable key, Text value,
                Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            String line = value.toString();
            String[] spilted = line.split(" ");
            for (String word : spilted) {
                context.write(new Text(word), new LongWritable(1L));
                // 為了顯示效果而輸出Mapper的輸出鍵值對資訊
                System.out.println("Mapper輸出<" + word + "," + 1 + ">");
            }
        };
    }

3.2 改寫Reducer類的reduce方法

     public static class MyReducer extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(Text key,
                java.lang.Iterable<LongWritable> values,
                Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            // 顯示次數表示redcue函式被呼叫了多少次,表示k2有多少個分組
            System.out.println("Reducer輸入分組<" + key.toString() + ",N(N>=1)>");

            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
                // 顯示次數表示輸入的k2,v2的鍵值對數量
                System.out.println("Reducer輸入鍵值對<" + key.toString() + ","
                        + value.get() + ">");
            }
            context.write(key, new LongWritable(count));
        };
    }

3.3 新增MyCombiner類並重寫reduce方法

    public static class MyCombiner extends
            Reducer<Text, LongWritable, Text, LongWritable> {
        protected void reduce(
                Text key,
                java.lang.Iterable<LongWritable> values,
                org.apache.hadoop.mapreduce.Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                throws java.io.IOException, InterruptedException {
            // 顯示次數表示規約函式被呼叫了多少次,表示k2有多少個分組
            System.out.println("Combiner輸入分組<" + key.toString() + ",N(N>=1)>");
            long count = 0L;
            for (LongWritable value : values) {
                count += value.get();
                // 顯示次數表示輸入的k2,v2的鍵值對數量
                System.out.println("Combiner輸入鍵值對<" + key.toString() + ","
                        + value.get() + ">");
            }
            context.write(key, new LongWritable(count));
            // 顯示次數表示輸出的k2,v2的鍵值對數量
            System.out.println("Combiner輸出鍵值對<" + key.toString() + "," + count
                    + ">");
        };
    }

3.4 新增設定Combiner的程式碼

    // 設定Map規約Combiner
    job.setCombinerClass(MyCombiner.class);

3.5 除錯執行的控制檯輸出資訊

  (1)Mapper

Mapper輸出<hello,1>
Mapper輸出<edison,1>
Mapper輸出<hello,1>
Mapper輸出<kevin,1>

  (2)Combiner

Combiner輸入分組<edison,N(N>=1)>
Combiner輸入鍵值對<edison,1>
Combiner輸出鍵值對<edison,1>
Combiner輸入分組<hello,N(N>=1)>
Combiner輸入鍵值對<hello,1>
Combiner輸入鍵值對<hello,1>
Combiner輸出鍵值對<hello,2>
Combiner輸入分組<kevin,N(N>=1)>
Combiner輸入鍵值對<kevin,1>
Combiner輸出鍵值對<kevin,1>

  這裡可以看出,在Combiner中進行了一次本地的Reduce操作,從而簡化了遠端Reduce節點的歸併壓力。

  (3)Reducer

Reducer輸入分組<edison,N(N>=1)>
Reducer輸入鍵值對<edison,1>
Reducer輸入分組<hello,N(N>=1)>
Reducer輸入鍵值對<hello,2>
Reducer輸入分組<kevin,N(N>=1)>
Reducer輸入鍵值對<kevin,1>

  這裡可以看出,在對hello的歸併上,只進行了一次操作就完成了。

  那麼,如果我們再來看看不新增Combiner時的控制檯輸出資訊:

  (1)Mapper

Mapper輸出<hello,1>
Mapper輸出<edison,1>
Mapper輸出<hello,1>
Mapper輸出<kevin,1>

  (2)Reducer

Reducer輸入分組<edison,N(N>=1)>
Reducer輸入鍵值對<edison,1>
Reducer輸入分組<hello,N(N>=1)>
Reducer輸入鍵值對<hello,1>
Reducer輸入鍵值對<hello,1>
Reducer輸入分組<kevin,N(N>=1)>
Reducer輸入鍵值對<kevin,1>

  可以看出,沒有采用Combiner時hello都是由Reducer節點來進行統一的歸併,也就是這裡為何會有兩次hello的輸入鍵值對了。

總結:從控制檯的輸出資訊我們可以發現,其實combine只是把兩個相同的hello進行規約,由此輸入給reduce的就變成了<hello,2>。在實際的Hadoop叢集操作中,我們是由多臺主機一起進行MapReduce的,如果加入規約操作,每一臺主機會在reduce之前進行一次對本機資料的規約,然後在通過叢集進行reduce操作,這樣就會大大節省reduce的時間,從而加快MapReduce的處理速度。

參考資料

(4)iPolaris,《Hadoop中Combiner的使用》:http://blog.csdn.net/ipolaris/article/details/8723782

作者:周旭龍

本文版權歸作者和部落格園共有,歡迎轉載,但未經作者同意必須保留此段宣告,且在文章頁面明顯位置給出原文連結。

相關推薦

Hadoop學習筆記—7.計數器定義計數器

一、Hadoop中的計數器 計數器:計數器是用來記錄job的執行進度和狀態的。它的作用可以理解為日誌。我們通常可以在程式的某個位置插入計數器,用來記錄資料或者進度的變化情況,它比日誌更便利進行分析。   例如,我們有一個檔案,其中包含如下內容: hello you hello me

Hadoop學習筆記—9.Partitioner定義Partitioner

一、初步探索Partitioner 1.1 再次回顧Map階段五大步驟   在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:   其中,step1.3就是一個分割槽操作。通過前面的學習我們知道Mapper最終處理的鍵值對&l

Hadoop學習筆記8.Combiner定義Combiner

一、Combiner的出現背景 1.1 回顧Map階段五大步驟   在第四篇博文《初識MapReduce》中,我們認識了MapReduce的八大步湊,其中在Map階段總共五個步驟,如下圖所示:   其中,step1.5是一個可選步驟,它就是我們今天需要了解的 Map規約 階段。現在,我們再來看看前一

Hadoopcombiner學習定義combiner

Combiner的概念 Combiner號稱本地的Reduce,Reduce的輸入是Combiner的最終輸出。 在MapReduce中,當map生成的資料過大時,頻寬就成了瓶頸,怎樣精簡壓縮傳給Reduce的資料,有不影響最終的結果呢。有一種方法就是使用Combiner,Comb

Shader學習筆記(三)定義光照模型,經典光照模型LambertHalfLambert

自定義光照模型 #pragma surface surfaceFaction lightModel surfaceFaction:著色器程式碼的方法的名字 lightModel:光照模型的名稱 自

AngularJs學習筆記(4)——定義指令

ref 告訴 ack 生命周期 .com bsp ctrl 參數變量 ng- 對指令的第一印象:它是一個自定義標簽! 先來看一個簡單的指令: <!doctype html> <html ng-app="myApp"> <head>

django 筆記8 url模板 定義函數 simple_tag

增刪改查 AI gis code core coo oct print sim 感謝alex老師~知識點: URL - 兩個沒見 url>路由系統> 函數或類 > 返回字符串 Form表單提交:

Ehcache學習筆記(2)--定義ehcache工具類

二:自定義EhcacheUtils 1、CacheUtils package cn.kexq.commons.utils; import net.sf.ehcache.Cache; import net.sf.ehcache.CacheManager; import net.sf.eh

shiro學習筆記(3)--定義realm、授權

一:自定義Realm 1、繼承AuthorizingRealm(因為該類中有認證、授權的抽象方法,實現簡單) public class MyRealm1 extends AuthorizingRealm{ @Override public String getName(

Android學習筆記之為Dialog定義佈局,並說明空指標問題

分享一下我老師大神的人工智慧教程!零基礎,通俗易懂!http://blog.csdn.net/jiangjunshow 也歡迎大家轉載本篇文章。分享知識,造福人民,實現我們中華民族偉大復興!        

Vue:學習筆記(七)-定義指令

提醒 原帖完整收藏於IT老兵驛站,並會不斷更新。 前言 前面總結到了元件,對混入也進行了研究,不過感覺沒有啥需要總結的,就先總結指令吧,參考這裡,記錄筆記。 正文 簡介 全域性註冊 // 註冊一個全域性自定義指令 `v-focus` Vue.di

Zynq-Linux移植學習筆記之31-使用者定義I2C驅動

1、背景介紹 板子上通過ZYNQ的I2C-0控制器連線了三片DBF晶片和一片Ti的226測功耗晶片,示意圖如下: 如上圖所示,三塊DBF晶片的I2C地址分別為2,4,8,Ti 226晶片的I2C地址為0x40.現在需要ZYNQ通過I2C匯流排讀寫這四塊晶片的暫存器數值

springmvc學習筆記(26)——定義型別轉換器

資料繫結流程 使用springmvc框架有諸多好處,其中較為突出的就是它的資料繫結。 當我們的前端傳過來一個表單的時候,我們只需要使用一個類物件(如Student物件)就接收,springmvc將幫我們把屬性一一對應的填充進去。這就是資料繫結。 資料繫結過程中,springmvc幫我們把前端

springmvc學習筆記(28)——定義攔截器

1. 自定義攔截器 所謂的攔截器,就是用來攔截請求,因此我們可以對使用者發來的請求做處理。  寫一個類,實現攔截器的介面 import javax.servlet.http.HttpServletRequest; import javax.servlet.http.H

【轉】WPF定義控制元件樣式(8)-ComboBox定義多選控制元件MultComboBox

一.前言   申明:WPF自定義控制元件與樣式是一個系列文章,前後是有些關聯的,但大多是按照由簡到繁的順序逐步釋出的等。   本文主要內容: 下拉選擇控制元件ComboBox的自定義樣式及擴充套件; 自定義多選控制元件MultiComboBox; 二.下拉選擇控制元件ComboBox的自

類的學習筆記(3)——定義裝飾器及裝飾器的理解

例一: 實現多加100 def fun1(x):      def fun2(y):           return x(y)+100       return fun2              #裝飾器 def ff(y):       return y*y   

react native學習筆記24——Modal實現定義彈出對話方塊

前言 上一篇文章介紹React Native系統提供的兩個彈出框的api——Alert與AlertIOS,Alert可以在雙平臺通用,但是隻能展示資訊量有限功能單一的文字對話方塊。AlertIOS比Alert稍微豐富一點,可以展示供使用者輸入的對話方塊,但只能

iPhone開發學習筆記005——使用XIB定義一個UIView,然後將這個view新增到controller的view

一、新建一個single view application型別的iOS application工程,名字取為CustomView,如下圖,我們不往CustomViewViewController.xib中新增任何控制元件:  二、新建一個CustomView.xib,過程如下:然後往介面上拖一個label和

Maven學習筆記(一)——定義maven變數以及maven內建常量

在建立Maven工程後,外掛配置中通常會用到一些Maven變數,因此需要找個地方對這些變數進行統一定義,下面介紹如何定義自定義變數。 在根節點project下增加properties節點,所有自定義變數均可以定義在此節點內,如下所示: <!-- 全域性屬性配置 --

torch學習筆記1:實現定義

當我們要實現自己的一些idea時,torch自帶的模組和函式已經不能滿足,我們需要自己實現層(或者類),一般的做法是把自定義層加入到已有的torch模組中。 實現 lua實現 如果自定義層的功能可以通過呼叫torch中已有的函式實現,那就只需要用l