1. 程式人生 > >Hadoop MapReduce執行過程詳解(帶hadoop例子)

Hadoop MapReduce執行過程詳解(帶hadoop例子)

問題導讀

1.MapReduce是如何執行任務的?
2.Mapper任務是怎樣的一個過程?
3.Reduce是如何執行任務的?
4.鍵值對是如何編號的?
5.例項,如何計算沒見最高氣溫?



分析MapReduce執行過程
    MapReduce執行的時候,會通過Mapper執行的任務讀取HDFS中的資料檔案,然後呼叫自己的方法,處理資料,最後輸出。Reducer任務會接收Mapper任務輸出的資料,作為自己的輸入資料,呼叫自己的方法,最後輸出到HDFS的檔案中。整個流程如圖:
 

Mapper任務的執行過程詳解
    每個Mapper任務是一個java程序,它會讀取HDFS中的檔案,解析成很多的鍵值對,經過我們覆蓋的map方法處理後,轉換為很多的鍵值對再輸出。整個Mapper任務的處理過程又可以分為以下幾個階段,如圖所示。


 

在上圖中,把Mapper任務的執行過程分為六個階段

  • 第一階段是把輸入檔案按照一定的標準分片(InputSplit),每個輸入片的大小是固定的。預設情況下,輸入片(InputSplit)的大小與資料塊(Block)的大小是相同的。如果資料塊(Block)的大小是預設值64MB,輸入檔案有兩個,一個是32MB,一個是72MB。那麼小的檔案是一個輸入片,大檔案會分為兩個資料塊,那麼是兩個輸入片。一共產生三個輸入片。每一個輸入片由一個Mapper程序處理。這裡的三個輸入片,會有三個Mapper程序處理。
  • 第二階段是對輸入片中的記錄按照一定的規則解析成鍵值對。有個預設規則是把每一行文字內容解析成鍵值對。“鍵”是每一行的起始位置(單位是位元組),“值”是本行的文字內容。
  • 第三階段是呼叫Mapper類中的map方法。第二階段中解析出來的每一個鍵值對,呼叫一次map方法。如果有1000個鍵值對,就會呼叫1000次map方法。每一次呼叫map方法會輸出零個或者多個鍵值對。
  • 第四階段是按照一定的規則對第三階段輸出的鍵值對進行分割槽。比較是基於鍵進行的。比如我們的鍵表示省份(如北京、上海、山東等),那麼就可以按照不同省份進行分割槽,同一個省份的鍵值對劃分到一個區中。預設是隻有一個區。分割槽的數量就是Reducer任務執行的數量。預設只有一個Reducer任務。
  • 第五階段是對每個分割槽中的鍵值對進行排序。首先,按照鍵進行排序,對於鍵相同的鍵值對,按照值進行排序。比如三個鍵值對<2,2>、<1,3>、<2,1>,鍵和值分別是整數。那麼排序後的結果是<1,3>、<2,1>、<2,2>。如果有第六階段,那麼進入第六階段;如果沒有,直接輸出到本地的linux檔案中。
  • 第六階段是對資料進行歸約處理,也就是reduce處理。鍵相等的鍵值對會呼叫一次reduce方法。經過這一階段,資料量會減少。歸約後的資料輸出到本地的linxu檔案中。本階段預設是沒有的,需要使用者自己增加這一階段的程式碼。
Reducer任務的執行過程詳解
   每個Reducer任務是一個java程序。Reducer任務接收Mapper任務的輸出,歸約處理後寫入到HDFS中,可以分為如下圖所示的幾個階段。
 
  • 第一階段是Reducer任務會主動從Mapper任務複製其輸出的鍵值對。Mapper任務可能會有很多,因此Reducer會複製多個Mapper的輸出。
  • 第二階段是把複製到Reducer本地資料,全部進行合併,即把分散的資料合併成一個大的資料。再對合並後的資料排序。
  • 第三階段是對排序後的鍵值對呼叫reduce方法。鍵相等的鍵值對呼叫一次reduce方法,每次呼叫會產生零個或者多個鍵值對。最後把這些輸出的鍵值對寫入到HDFS檔案中。
在整個MapReduce程式的開發過程中,我們最大的工作量是覆蓋map函式和覆蓋reduce函式。
鍵值對的編號
   在對Mapper任務、Reducer任務的分析過程中,會看到很多階段都出現了鍵值對,讀者容易混淆,所以這裡對鍵值對進行編號,方便大家理解鍵值對的變化情況,如下圖所示。
 
在上圖中,對於Mapper任務輸入的鍵值對,定義為key1和value1。在map方法中處理後,輸出的鍵值對,定義為key2和value2。reduce方法接收key2和value2,處理後,輸出key3和value3。在下文討論鍵值對時,可能把key1和value1簡寫為<k1,v1>,key2和value2簡寫為<k2,v2>,key3和value3簡寫為<k3,v3>。
以上內容來自:http://www.superwu.cn/2013/08/21/530/

-----------------------分------------------割----------------線-------------------------

例子:求每年最高氣溫

在HDFS中的根目錄下有以下檔案格式: /input.txt

2014010114
2014010216
2014010317
2014010410
2014010506
2012010609
2012010732
2012010812
2012010919
2012011023
2001010116
2001010212
2001010310
2001010411
2001010529
2013010619
2013010722
2013010812
2013010929
2013011023
2008010105
2008010216
2008010337
2008010414
2008010516
2007010619
2007010712
2007010812
2007010999
2007011023
2010010114
2010010216
2010010317
2010010410
2010010506
2015010649
2015010722
2015010812
2015010999
2015011023


    比如:2010012325表示在2010年01月23日的氣溫為25度。現在要求使用MapReduce,計算每一年出現過的最大氣溫。

    在寫程式碼之前,先確保正確的匯入了相關的jar包。我使用的是maven,可以到http://mvnrepository.com去搜索這幾個artifactId。

    此程式需要以Hadoop檔案作為輸入檔案,以Hadoop檔案作為輸出檔案,因此需要用到檔案系統,於是需要引入hadoop-hdfs包;我們需要向Map-Reduce叢集提交任務,需要用到Map-Reduce的客戶端,於是需要匯入hadoop-mapreduce-client-jobclient包;另外,在處理資料的時候會用到一些hadoop的資料型別例如IntWritable和Text等,因此需要匯入hadoop-common包。於是執行此程式所需要的相關依賴有以下幾個:

<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
    <version>2.4.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.4.0</version>
</dependency>


包導好了後, 設計程式碼如下

package com.abc.yarn;
  
import java.io.IOException;
  
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  
publicclass Temperature {
    /**
     * 四個泛型型別分別代表:
     * KeyIn        Mapper的輸入資料的Key,這裡是每行文字的起始位置(0,11,...)
     * ValueIn      Mapper的輸入資料的Value,這裡是每行文字
     * KeyOut       Mapper的輸出資料的Key,這裡是每行文字中的“年份”
     * ValueOut     Mapper的輸出資料的Value,這裡是每行文字中的“氣溫”
     */
    staticclass TempMapper extends
            Mapper<LongWritable, Text, Text, IntWritable> {
        @Override
        publicvoid map(LongWritable key, Text value, Context context)
                throwsIOException, InterruptedException {
            // 列印樣本: Before Mapper: 0, 2000010115
            System.out.print("Before Mapper: " + key + ", " + value);
            String line = value.toString();
            String year = line.substring(0,4);
            inttemperature = Integer.parseInt(line.substring(8));
            context.write(newText(year), newIntWritable(temperature));
            // 列印樣本: After Mapper:2000, 15
            System.out.println(
                    "======"+
                    "After Mapper:" + newText(year) + ", " + newIntWritable(temperature));
        }
    }
  
    /**
     * 四個泛型型別分別代表:
     * KeyIn        Reducer的輸入資料的Key,這裡是每行文字中的“年份”
     * ValueIn      Reducer的輸入資料的Value,這裡是每行文字中的“氣溫”
     * KeyOut       Reducer的輸出資料的Key,這裡是不重複的“年份”
     * ValueOut     Reducer的輸出資料的Value,這裡是這一年中的“最高氣溫”
     */
    staticclass TempReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        @Override
        publicvoid reduce(Text key, Iterable<IntWritable> values,
                Context context) throwsIOException, InterruptedException {
            intmaxValue = Integer.MIN_VALUE;
            StringBuffer sb = newStringBuffer();
            //取values的最大值
            for(IntWritable value : values) {
                maxValue = Math.max(maxValue, value.get());
                sb.append(value).append(", ");
            }
            // 列印樣本: Before Reduce: 2000, 15, 23, 99, 12, 22, 
            System.out.print("Before Reduce: " + key + ", " + sb.toString());
            context.write(key,newIntWritable(maxValue));
            // 列印樣本: After Reduce: 2000, 99
            System.out.println(
                    "======"+
                    "After Reduce: " + key + ", " + maxValue);
        }
    }
  
    publicstatic void main(String[] args) throwsException {
        //輸入路徑
        String dst = "hdfs://localhost:9000/intput.txt";
        //輸出路徑,必須是不存在的,空檔案加也不行。
        String dstOut = "hdfs://localhost:9000/output";
        Configuration hadoopConfig = newConfiguration();
          
        hadoopConfig.set("fs.hdfs.impl",
            org.apache.hadoop.hdfs.DistributedFileSystem.class.getName()
        );
        hadoopConfig.set("fs.file.impl",
            org.apache.hadoop.fs.LocalFileSystem.class.getName()
        );
        Job job = newJob(hadoopConfig);
          
        //如果需要打成jar執行,需要下面這句
        //job.setJarByClass(NewMaxTemperature.class);
  
        //job執行作業時輸入和輸出檔案的路徑
        FileInputFormat.addInputPath(job,newPath(dst));
        FileOutputFormat.setOutputPath(job,newPath(dstOut));
  
        //指定自定義的Mapper和Reducer作為兩個階段的任務處理類
        job.setMapperClass(TempMapper.class);
        job.setReducerClass(TempReducer.class);
          
        //設定最後輸出結果的Key和Value的型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
          
        //執行job,直到完成
        job.waitForCompletion(true);
        System.out.println("Finished");
    }
}


上面程式碼中,注意Mapper類的泛型不是java的基本型別,而是Hadoop的資料型別Text、IntWritable。我們可以簡單的等價為java的類String、int。

程式碼中Mapper類的泛型依次是<k1,v1,k2,v2>。map方法的第二個形參是行文字內容,是我們關心的。核心程式碼是把行文字內容按照空格拆分,把每行資料中“年”和“氣溫”提取出來,其中“年”作為新的鍵,“溫度”作為新的值,寫入到上下文context中。在這裡,因為每一年有多行資料,因此每一行都會輸出一個<年份, 氣溫>鍵值對。

下面是控制檯列印結果:

Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 11, 2014010216======After Mapper:2014, 16
Before Mapper: 22, 2014010317======After Mapper:2014, 17
Before Mapper: 33, 2014010410======After Mapper:2014, 10
Before Mapper: 44, 2014010506======After Mapper:2014, 6
Before Mapper: 55, 2012010609======After Mapper:2012, 9
Before Mapper: 66, 2012010732======After Mapper:2012, 32
Before Mapper: 77, 2012010812======After Mapper:2012, 12
Before Mapper: 88, 2012010919======After Mapper:2012, 19
Before Mapper: 99, 2012011023======After Mapper:2012, 23
Before Mapper: 110, 2001010116======After Mapper:2001, 16
Before Mapper: 121, 2001010212======After Mapper:2001, 12
Before Mapper: 132, 2001010310======After Mapper:2001, 10
Before Mapper: 143, 2001010411======After Mapper:2001, 11
Before Mapper: 154, 2001010529======After Mapper:2001, 29
Before Mapper: 165, 2013010619======After Mapper:2013, 19
Before Mapper: 176, 2013010722======After Mapper:2013, 22
Before Mapper: 187, 2013010812======After Mapper:2013, 12
Before Mapper: 198, 2013010929======After Mapper:2013, 29
Before Mapper: 209, 2013011023======After Mapper:2013, 23
Before Mapper: 220, 2008010105======After Mapper:2008, 5
Before Mapper: 231, 2008010216======After Mapper:2008, 16
Before Mapper: 242, 2008010337======After Mapper:2008, 37
Before Mapper: 253, 2008010414======After Mapper:2008, 14
Before Mapper: 264, 2008010516======After Mapper:2008, 16
Before Mapper: 275, 2007010619======After Mapper:2007, 19
Before Mapper: 286, 2007010712======After Mapper:2007, 12
Before Mapper: 297, 2007010812======After Mapper:2007, 12
Before Mapper: 308, 2007010999======After Mapper:2007, 99
Before Mapper: 319, 2007011023======After Mapper:2007, 23
Before Mapper: 330, 2010010114======After Mapper:2010, 14
Before Mapper: 341, 2010010216======After Mapper:2010, 16
Before Mapper: 352, 2010010317======After Mapper:2010, 17
Before Mapper: 363, 2010010410======After Mapper:2010, 10
Before Mapper: 374, 2010010506======After Mapper:2010, 6
Before Mapper: 385, 2015010649======After Mapper:2015, 49
Before Mapper: 396, 2015010722======After Mapper:2015, 22
Before Mapper: 407, 2015010812======After Mapper:2015, 12
Before Mapper: 418, 2015010999======After Mapper:2015, 99
Before Mapper: 429, 2015011023======After Mapper:2015, 23
Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99
Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37
Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17
Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17
Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99



    執行結果:

 

對分析的驗證  
    從列印的日誌中可以看出:

  • Mapper的輸入資料(k1,v1)格式是:預設的按行分的鍵值對<0, 2010012325>,<11, 2012010123>...
  • Reducer的輸入資料格式是:把相同的鍵合併後的鍵值對:<2001, [12, 32, 25...]>,<2007, [20, 34, 30...]>...
  • Reducer的輸出數(k3,v3)據格式是:經自己在Reducer中寫出的格式:<2001, 32>,<2007, 34>...
    其中,由於輸入資料太小,Map過程的第1階段這裡不能證明。但事實上是這樣的。
    結論中第一點驗證了Map過程的第2階段:“鍵”是每一行的起始位置(單位是位元組),“值”是本行的文字內容。

    另外,通過Reduce的幾行

Before Reduce: 2001, 12, 10, 11, 29, 16, ======After Reduce: 2001, 29
Before Reduce: 2007, 23, 19, 12, 12, 99, ======After Reduce: 2007, 99
Before Reduce: 2008, 16, 14, 37, 16, 5, ======After Reduce: 2008, 37
Before Reduce: 2010, 10, 6, 14, 16, 17, ======After Reduce: 2010, 17
Before Reduce: 2012, 19, 12, 32, 9, 23, ======After Reduce: 2012, 32
Before Reduce: 2013, 23, 29, 12, 22, 19, ======After Reduce: 2013, 29
Before Reduce: 2014, 14, 6, 10, 17, 16, ======After Reduce: 2014, 17
Before Reduce: 2015, 23, 49, 22, 12, 99, ======After Reduce: 2015, 99


   可以證實Map過程的第4階段:先分割槽,然後對每個分割槽都執行一次Reduce(Map過程第6階段)。

    對於Mapper的輸出,前文中提到:如果沒有Reduce過程,Mapper的輸出會直接寫入檔案。於是我們把Reduce方法去掉(註釋掉第95行即可)。

    再執行,下面是控制檯列印結果:

Before Mapper: 0, 2014010114======After Mapper:2014, 14
Before Mapper: 11, 2014010216======After Mapper:2014, 16
Before Mapper: 22, 2014010317======After Mapper:2014, 17
Before Mapper: 33, 2014010410======After Mapper:2014, 10
Before Mapper: 44, 2014010506======After Mapper:2014, 6
Before Mapper: 55, 2012010609======After Mapper:2012, 9
Before Mapper: 66, 2012010732======After Mapper:2012, 32
Before Mapper: 77, 2012010812======After Mapper:2012, 12
Before Mapper: 88, 2012010919======After Mapper:2012, 19
Before Mapper: 99, 2012011023======After Mapper:2012, 23
Before Mapper: 110, 2001010116======After Mapper:2001, 16
Before Mapper: 121, 2001010212======After Mapper:2001, 12
Before Mapper: 132, 2001010310======After Mapper:2001, 10
Before Mapper: 143, 2001010411======After Mapper:2001, 11
Before Mapper: 154, 2001010529======After Mapper:2001, 29
Before Mapper: 165, 2013010619======After Mapper:2013, 19
Before Mapper: 176, 2013010722======After Mapper:2013, 22
Before Mapper: 187, 2013010812======After Mapper:2013, 12
Before Mapper: 198, 2013010929======After Mapper:2013, 29
Before Mapper: 209, 2013011023======After Mapper:2013, 23
Before Mapper: 220, 2008010105======After Mapper:2008, 5
Before Mapper: 231, 2008010216======After Mapper:2008, 16
Before Mapper: 242, 2008010337======After Mapper:2008, 37
Before Mapper: 253, 2008010414======After Mapper:2008, 14
Before Mapper: 264, 2008010516======After Mapper:2008, 16
Before Mapper: 275, 2007010619======After Mapper:2007, 19
Before Mapper: 286, 2007010712======After Mapper:2007, 12
Before Mapper: 297, 2007010812======After Mapper:2007, 12
Before Mapper: 308, 2007010999======After Mapper:2007, 99
Before Mapper: 319, 2007011023======After Mapper:2007, 23
Before Mapper: 330, 2010010114======After Mapper:2010, 14
Before Mapper: 341, 2010010216======After Mapper:2010, 16
Before Mapper: 352, 2010010317======After Mapper:2010, 17
Before Mapper: 363, 2010010410======After Mapper:2010, 10
Before Mapper: 374, 2010010506======After Mapper:2010, 6
Before Mapper: 385, 2015010649======After Mapper:2015, 49
Before Mapper: 396, 2015010722======After Mapper:2015, 22
Before Mapper: 407, 2015010812======After Mapper:2015, 12
Before Mapper: 418, 2015010999======After Mapper:2015, 99
Before Mapper: 429, 2015011023======After Mapper:2015, 23
Finished



    再來看看執行結果:

 
    結果還有很多行,沒有截圖了。

    由於沒有執行Reduce操作,因此這個就是Mapper輸出的中間檔案的內容了。
    從列印的日誌可以看出:


  • Mapper的輸出資料(k2, v2)格式是:經自己在Mapper中寫出的格式:<2010, 25>,<2012, 23>...
    從這個結果中可以看出,原資料檔案中的每一行確實都有一行輸出,那麼Map過程的第3階段就證實了。
    從這個結果中還可以看出,“年份”已經不是輸入給Mapper的順序了,這也說明了在Map過程中也按照Key執行了排序操作,即Map過程的第5階段。