1. 程式人生 > >分布式計算框架MapReduce

分布式計算框架MapReduce

Hadoop MapReduce 大數據 分布式計算框架 JobHistory

MapReduce概述

MapReduce源自Google的MapReduce論文,論文發表於2004年12月。Hadoop MapReduce可以說是Google MapReduce的一個開源實現。MapReduce優點在於可以將海量的數據進行離線處理,並且MapReduce也易於開發,因為MapReduce框架幫我們封裝好了分布式計算的開發。而且對硬件設施要求不高,可以運行在廉價的機器上。MapReduce也有缺點,它最主要的缺點就是無法完成實時流式計算,只能離線處理。

MapReduce屬於一種編程模型,用於大規模數據集(大於1TB)的並行運算。概念"Map(映射)"和"Reduce(歸約)",是它們的主要思想,都是從函數式編程語言裏借來的,還有從矢量編程語言裏借來的特性。它極大地方便了編程人員在不會分布式並行編程的情況下,將自己的程序運行在分布式系統上。 當前的軟件實現是指定一個Map(映射)函數,用來把一組鍵值對映射成一組新的鍵值對,指定並發的Reduce(歸約)函數,用來保證所有映射的鍵值對中的每一個共享相同的鍵組。

MapReduce官方文檔地址如下:

https://hadoop.apache.org/docs/stable/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html

在學習MapReduce之前我們需要準備好Hadoop的環境,也就是需要先安裝好HDFS以及YARN,環境的搭建方式可以參考我之前的兩篇文章:HDFS偽分布式環境搭建 以及 分布式資源調度——YARN框架


從WordCount案例說起MapReduce編程模型

在安裝Hadoop時,它就自帶有一個WordCount的案例,這個案例是統計文件中每個單詞出現的次數,也就是詞頻統計,我們在學習大數據開發時,一般都以WordCount作為入門。

例如,我現在有一個test.txt,文件內容如下:

hello world
hello hadoop
hello MapReduce

現在的需求是統計這個文件中每個單詞出現的次數。假設我現在寫了一些代碼實現了這個文件的詞頻統計,統計的結果如下:

hello 3
world 1
hadoop 1
MapReduce 1

以上這就是一個詞頻統計的例子。

詞頻統計看起來貌似很簡單的樣子,一般不需要多少代碼就能完成了,而且如果對shell腳本比較熟悉的話,甚至一句代碼就能完成這個詞頻統計的功能。確實詞頻統計是不難,但是為什麽還要用大數據技術去完成這個詞頻統計的功能呢?這是因為實現小文件的詞頻統計功能或許用簡單的代碼就能完成,但是如果是幾百GB、TB甚至是PB級的大文件還能用簡單的代碼完成嗎?這顯然是不可能的,就算能也需要花費相當大的時間成本。

而大數據技術就是要解決這種處理海量數據的問題,MapReduce在其中就是充當一個分布式並行計算的角色,分布式並行計算能大幅度提高海量數據的處理速度,畢竟多個人幹活肯定比一個人幹活快。又回到我們上面所說到的詞頻統計的例子,在實際工作中很多場景的開發都是在WordCount的基礎上進行改造的。例如,要從所有服務器的訪問日誌中統計出被訪問得最多的url以及訪問量最高的IP,這就是一個典型的WordCount應用場景,要知道即便是小公司的服務器訪問日誌通常也都是GB級別的。

使用MapReduce執行WordCount的流程示意圖:
技術分享圖片

從上圖中,可以看到,輸入的數據集會被拆分為多個塊,然後這些塊都會被放到不同的節點上進行並行的計算。在Splitting這一環節會把單詞按照分割符或者分割規則進行拆分,拆分完成後就到Mapping上了,到Mapping這個環節後會把相同的單詞通過網絡進行映射或者說傳輸到同一個節點上。接著這些相同的單詞就會在Shuffling環節時進行洗牌也就是合並,合並完成之後就會進入Reducing環節,這一環節就是把所有節點合並後的單詞再進行一次合並,也就是會輸出到HDFS文件系統中的某一個文件中。大體來看就是一個拆分又合並的過程,所以MapReduce是分為map和Reduce的。最重要的是,要清楚這一流程都是分布式並行的,每個節點都不會互相依賴,都是相互獨立的。


MapReduce執行流程

以上我們也提到了MapReduce是分為Map和Reduce的,也就是說一個MapReduce作業會被拆分成Map和Reduce階段。Map階段對應的就是一堆的Map Tasks,同樣的Reduce階段也是會對應一堆的Reduce Tasks。

其實簡單來說這也是一個輸入輸出的流程,要註意的是在MapReduce框架中輸入的數據集會被序列化成鍵/值對,map階段完成後會對這些鍵值對進行排序,最後到reduce階段中進行合並輸出,輸出的也是鍵/值對,官網文檔寫的流程如下:

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

示意圖:
技術分享圖片

我們可以看到有幾個主要的點:

  • InputFormat:將我們輸入數據進行分片(split)
  • Split:將數據塊交MapReduce作業來處理,數據塊是MapReduce中最小的計算單元
    • 在HDFS中,數據塊是最小的存儲單元,默認為128M
    • 默認情況下,HDFS與MapReduce是一一對應的,當然我們也可以手動所設置它們之間的關系(但是不建議這麽做)
  • OutputFormat:輸出最終的處理結果

我們可以再來看一張圖,假設我們手動設置了block與split的對應關系,一個block對應兩個split:
技術分享圖片

上圖中一個block對應兩個split(默認是一對一),一個split則是對應一個Map Task。Map Task將數據分完組之後到Shuffle,Shuffle完成後就到Reduce上進行輸出,而每一個Reduce Tasks會輸出到一個文件上,上圖中有三個Reduce Tasks,所以會輸出到三個文件上。


MapReduce1.x架構

MapReduce1.x架構圖如下:
技術分享圖片

簡單說明一下其中的幾個組件:

  1. JobTracker:作業的管理者,它會將作業分解成一堆的任務,也就是Task,Task裏包含MapTask和ReduceTask。它會將分解後的任務分派給TaskTracker進行運行,它還需要完成作業的監控以及容錯處理(task作業掛掉了,會重啟task)。如果在一定的時間內,JobTracker沒有收到某個TaskTracker的心跳信息的話,就會判斷該TaskTracker掛掉了,然後就會將該TaskTracker上運行的任務指派到其他的TaskTracker上去執行。
  2. TaskTracker:任務的執行者,我們的Task(MapTask和ReduceTask)都是在TaskTracker上運行的,TaskTracker可以與JobTracker進行交互,例如執行、啟動或停止作業以及發送心跳信息給JobTracker等。
  3. MapTask:我們自己開發的Map任務會交由該Task完成,它會解析每條記錄的數據,然後交給自己編寫的Map方法進行處理,處理完成之後會將Map的輸出結果寫到本地磁盤。不過有些作業可能只有map沒有reduce,這時候一般會將結果輸出到HDFS文件系統裏。
  4. ReduceTask:將MapTask輸出的數據進行讀取,並按照數據的規則進行分組,然後傳給我們自己編寫的reduce方法處理。處理完成後默認將輸出結果寫到HDFS。

MapReduce2.x架構

MapReduce2.x架構圖如下,可以看到JobTracker和TaskTracker已經不復存在了,取而代之的是ResourceManager和NodeManager。不僅架構變了,功能也變了,2.x之後新引入了YARN,在YARN之上我們可以運行不同的計算框架,不再是1.x那樣只能運行MapReduce了:
技術分享圖片

關於MapReduce2.x的架構之前已經在分布式資源調度——YARN框架一文中說明過了,這裏就不再贅述了。


Java版本wordcount功能實現

1.創建一個Maven工程,配置依賴如下:

<repositories>
    <repository>
      <id>cloudera</id>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
      <releases>
        <enabled>true</enabled>
      </releases>
      <snapshots>
        <enabled>false</enabled>
      </snapshots>
    </repository>
  </repositories>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <hadoop.version>2.6.0-cdh5.7.0</hadoop.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.10</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

2.創建一個類,開始編寫我們wordcount的實現代碼:

package org.zero01.hadoop.mapreduce;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @program: hadoop-train
 * @description: 使用MapReduce開發WordCount應用程序
 * @author: 01
 * @create: 2018-03-31 14:03
 **/
public class WordCountApp {

    /**
     * Map: 讀取輸入的文件內容
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        LongWritable one = new LongWritable(1);

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 接收到的每一行數據
            String line = value.toString();

            // 按照指定的分割符進行拆分
            String[] words = line.split(" ");
            for (String word : words) {
                // 通過上下文把map的處理結果輸出
                context.write(new Text((word)), one);
            }
        }
    }

    /**
     * Reduce: 歸並操作
     */
    public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            long sum = 0;
            for (LongWritable value : values) {
                // 求key出現的次數總和
                sum += value.get();
            }
            // 將最終的統計結果輸出
            context.write(key, new LongWritable(sum));
        }
    }

    /**
     * 定義Driver:封裝了MapReduce作業的所有信息
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        // 創建Job,通過參數設置Job的名稱
        Job job = Job.getInstance(configuration, "wordcount");

        // 設置Job的處理類
        job.setJarByClass(WordCountApp.class);

        // 設置作業處理的輸入路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        // 設置map相關參數
        job.setMapperClass(MyMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 設置reduce相關參數
        job.setReducerClass(MyReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 設置作業處理完成後的輸出路徑
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

3.編寫完成之後,在IDEA裏通過Maven進行編譯打包:
技術分享圖片

4.把打包好的jar包上傳到服務器上:
技術分享圖片

測試文件內容如下:

[root@localhost ~]# hdfs dfs -text /test.txt
hello world
hadoop welcome
hadoop hdfs mapreduce
hadoop hdfs
hello hadoop
[root@localhost ~]# 

5.然後執行如下命令執行Job:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc

簡單說明一下這個命令:

  • hadoop jar 是Hadoop執行jar包的命令
  • ./hadoop-train-1.0.jar 是jar包的所在路徑
  • org.zero01.hadoop.mapreduce.WordCountApp 是jar包的主類也就是main類
  • /test.txt 是測試文件也就是輸入文件所在路徑(HDFS上的路徑)
  • /output/wc 為輸出文件的存在路徑

6.到YARN上查看任務執行的信息:

申請資源:
技術分享圖片

運行:
技術分享圖片

完成:
技術分享圖片

7.可以看到已經執行成功,命令行終端的日誌輸出內容如下:

18/03/31 22:55:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/03/31 22:55:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/03/31 22:55:52 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
18/03/31 22:55:53 INFO input.FileInputFormat: Total input paths to process : 1
18/03/31 22:55:53 INFO mapreduce.JobSubmitter: number of splits:1
18/03/31 22:55:53 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1522505784761_0001
18/03/31 22:55:54 INFO impl.YarnClientImpl: Submitted application application_1522505784761_0001
18/03/31 22:55:54 INFO mapreduce.Job: The url to track the job: http://localhost:8088/proxy/application_1522505784761_0001/
18/03/31 22:55:54 INFO mapreduce.Job: Running job: job_1522505784761_0001
18/03/31 22:56:06 INFO mapreduce.Job: Job job_1522505784761_0001 running in uber mode : false
18/03/31 22:56:06 INFO mapreduce.Job:  map 0% reduce 0%
18/03/31 22:56:11 INFO mapreduce.Job:  map 100% reduce 0%
18/03/31 22:56:16 INFO mapreduce.Job:  map 100% reduce 100%
18/03/31 22:56:16 INFO mapreduce.Job: Job job_1522505784761_0001 completed successfully
18/03/31 22:56:16 INFO mapreduce.Job: Counters: 49
    File System Counters
        FILE: Number of bytes read=190
        FILE: Number of bytes written=223169
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=174
        HDFS: Number of bytes written=54
        HDFS: Number of read operations=6
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=2
    Job Counters 
        Launched map tasks=1
        Launched reduce tasks=1
        Data-local map tasks=1
        Total time spent by all maps in occupied slots (ms)=3151
        Total time spent by all reduces in occupied slots (ms)=2359
        Total time spent by all map tasks (ms)=3151
        Total time spent by all reduce tasks (ms)=2359
        Total vcore-seconds taken by all map tasks=3151
        Total vcore-seconds taken by all reduce tasks=2359
        Total megabyte-seconds taken by all map tasks=3226624
        Total megabyte-seconds taken by all reduce tasks=2415616
    Map-Reduce Framework
        Map input records=5
        Map output records=11
        Map output bytes=162
        Map output materialized bytes=190
        Input split bytes=100
        Combine input records=0
        Combine output records=0
        Reduce input groups=6
        Reduce shuffle bytes=190
        Reduce input records=11
        Reduce output records=6
        Spilled Records=22
        Shuffled Maps =1
        Failed Shuffles=0
        Merged Map outputs=1
        GC time elapsed (ms)=233
        CPU time spent (ms)=1860
        Physical memory (bytes) snapshot=514777088
        Virtual memory (bytes) snapshot=5571788800
        Total committed heap usage (bytes)=471859200
    Shuffle Errors
        BAD_ID=0
        CONNECTION=0
        IO_ERROR=0
        WRONG_LENGTH=0
        WRONG_MAP=0
        WRONG_REDUCE=0
    File Input Format Counters 
        Bytes Read=74
    File Output Format Counters 
        Bytes Written=54

8.查看輸出文件的內容:

[root@localhost ~]# hdfs dfs -ls /output/wc/
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-03-31 22:56 /output/wc/_SUCCESS
-rw-r--r--   1 root supergroup         54 2018-03-31 22:56 /output/wc/part-r-00000  # 執行結果的輸出文件
[root@localhost ~]# hdfs dfs -text /output/wc/part-r-00000  # 查看文件內容
hadoop  4
hdfs    2
hello   2
mapreduce   1
welcome 1
world   1
[root@localhost ~]# 

Java版本wordcount功能重構

雖然我們已經成功通過編寫java代碼實現了wordcount功能,但是有一個問題,如果我們再執行剛剛那條命令,就會報如下錯誤:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc
18/04/01 00:30:12 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/04/01 00:30:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
18/04/01 00:30:12 WARN security.UserGroupInformation: PriviledgedActionException as:root (auth:SIMPLE) cause:org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already exists
Exception in thread "main" org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory hdfs://192.168.77.130:8020/output/wc already exists
    at org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.checkOutputSpecs(FileOutputFormat.java:146)
    at org.apache.hadoop.mapreduce.JobSubmitter.checkSpecs(JobSubmitter.java:270)
    at org.apache.hadoop.mapreduce.JobSubmitter.submitJobInternal(JobSubmitter.java:143)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1307)
    at org.apache.hadoop.mapreduce.Job$10.run(Job.java:1304)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1693)
    at org.apache.hadoop.mapreduce.Job.submit(Job.java:1304)
    at org.apache.hadoop.mapreduce.Job.waitForCompletion(Job.java:1325)
    at org.zero01.hadoop.mapreduce.WordCountApp.main(WordCountApp.java:86)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.hadoop.util.RunJar.run(RunJar.java:221)
    at org.apache.hadoop.util.RunJar.main(RunJar.java:136)
[root@localhost ~]# 

在平時的MapReduce據程序開發中,這個異常非常地常見,這個異常是因為輸出文件的存放目錄已經存在:Output directory hdfs://192.168.77.130:8020/output/wc already exists

有兩種方式可以解決這個問題:

  1. 在執行MapReduce作業時,先刪除或更改輸出文件的存放目錄(不推薦)
  2. 在代碼中完成自動刪除功能(推薦)

我們來在代碼中實現自動刪除功能,在剛剛的代碼中,加入如下內容:

...
/**
 * 定義Driver:封裝了MapReduce作業的所有信息
 */
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration configuration = new Configuration();

    // 準備清理已存在的輸出目錄
    Path outputPath = new Path(args[1]);
    FileSystem fileSystem = FileSystem.get(configuration);
    if (fileSystem.exists(outputPath)) {
        fileSystem.delete(outputPath,true);
        System.out.println("output file exists, but is has deleted");
    }
...

編寫完成之後重新將編輯後的jar包上傳,再執行hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /test.txt /output/wc命令,就不會再報錯了。


Combiner應用程序開發

Combiner類似於本地的Reduce,相當於是在Map階段的時候就做一個Reduce的操作,它能夠減少Map Task輸出的數據量及網絡傳輸量。

如下圖:
技術分享圖片

在上圖中,可以看到Mapper與Reducer之間有一層Combiner。Mapper先把數據在本地進行一個Combiner,也就是先做一個本地數據的合並,這個過程類似於Reduce只不過是本地的,也即是本節點。當Combiner合並完成之後,再把數據傳輸到Reducer上再一次進行最終的合並。這樣Map Task輸出的數據量就會大大減少,性能也會相應的提高,這一點可以從上圖中看到。

我們來嘗試一下在剛才開發的wordcount程序中,增加一層Combiner。增加Combiner很簡單,只需要在設置map和reduce參數的代碼之間增加一行代碼即可,如下:

// 通過Job對象來設置Combiner處理類,在邏輯上和reduce是一樣的
job.setCombinerClass(MyReducer.class);

修改完成並重新上傳jar包後,這時再執行wordcount程序,在終端的日誌輸出信息中,會發現Combiner相關的字段都有值,那麽就代表我們的Combiner已經成功添加進去了:
技術分享圖片

Combiner的適用場景:

  • 求和、計數,累計類型的場景適合使用

Combiner的不適用的場景:

  • 求平均數、求公約數等類型的操作不適合,如果這種場景下使用Combiner,得到的結果就是錯誤的

Partitioner應用程序開發

Partitioner決定Map Task輸出的數據交由哪個Reduce Task處理,也就是類似於制定一個分發規則。默認情況下的分發規則實現:分發的key的hash值對Reduce Task個數取模。

如下圖:
技術分享圖片

上圖中,把圓形數據放到了同一個Reduce Task上,把六邊形數據放到了同一個Reduce Task上,剩下的圖形數據則放到剩下的Reduce Task上, 這樣的一個分發過程就是Partitioner。

例如,我現在有一組數據如下,這是今日各個手機品牌的銷售量:

[root@localhost ~]# hdfs dfs -text /partitioner.txt 
xiaomi 200
huawei 300
xiaomi 100
iphone7 300
iphone7 500
nokia 100
[root@localhost ~]#

現在我有一個需求,就是將相同品牌的手機名稱,分發到同一個Reduce上進行處理。這就需要用到Partitioner了,在我們之前的代碼中增加如下內容:

public class WordCountApp {
    /**
     * Map: 讀取輸入的文件內容
     */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> {

        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 接收到的每一行數據
            String line = value.toString();

            // 按照指定的分割符進行拆分
            String[] words = line.split(" ");
            // 通過上下文把map的處理結果輸出
            context.write(new Text((words[0])), new LongWritable(Long.parseLong(words[1])));
        }
    }

    ...

    /**
     * Partitioner: 設定Map Task輸出的數據的分發規則
     */
    public static class MyPartitioner extends Partitioner<Text, LongWritable> {

        public int getPartition(Text key, LongWritable value, int numPartitions) {
            if(key.toString().equals("xiaomi")){
                return 0;
            }
            if(key.toString().equals("huawei")){
                return 1;
            }
            if(key.toString().equals("iphone7")) {
                return 2;
            }
            return 3;
        }
    }

    /**
     * 定義Driver:封裝了MapReduce作業的所有信息
     */
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        ...
        // 設置Job的partition
        job.setPartitionerClass(MyPartitioner.class);
        // 設置4個reducer,每個分區一個
        job.setNumReduceTasks(4);
        ...
    }
}

同樣的,修改了代碼後需要重新編譯打包,把新的jar上傳到服務器上。然後執行命令:

[root@localhost ~]# hadoop jar ./hadoop-train-1.0.jar org.zero01.hadoop.mapreduce.WordCountApp /partitioner.txt /output/wc

執行成功,此時可以看到/output/wc/目錄下有四個結果文件,這是因為我們在代碼上設置了4個reducer,並且可以看到內容都是正確的:

[root@localhost ~]# hdfs dfs -ls /output/wc/
Found 5 items
-rw-r--r--   1 root supergroup          0 2018-04-01 04:37 /output/wc/_SUCCESS
-rw-r--r--   1 root supergroup         11 2018-04-01 04:37 /output/wc/part-r-00000
-rw-r--r--   1 root supergroup         11 2018-04-01 04:37 /output/wc/part-r-00001
-rw-r--r--   1 root supergroup         13 2018-04-01 04:37 /output/wc/part-r-00002
-rw-r--r--   1 root supergroup         10 2018-04-01 04:37 /output/wc/part-r-00003
[root@localhost ~]# for i in `seq 0 3`; do hdfs dfs -text /output/wc/part-r-0000$i; done
xiaomi  300
huawei  300
iphone7 800
nokia   100
[root@localhost ~]# 

JobHistory的配置

JobHistory是一個Hadoop自帶的歷史服務器,它用於記錄已運行完的MapReduce信息到指定的HDFS目錄下。我們都知道,執行了MapReduce任務後,可以在YARN的管理頁面上查看到任務的相關信息,但是由於JobHistory默認情況下是不開啟的,所以我們無法通過點擊History查看歷史信息:
技術分享圖片

所以我們就需要打開這個服務,編輯配置文件內容:

[root@localhost ~]# cd /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# vim mapred-site.xml  # 增加如下內容
<!-- jobhistory的通信地址 -->
<property>
    <name>mapreduce.jobhistory.address</name>
    <value>192.168.77.130:10020</value>
    <description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- jobhistory的web訪問地址 -->
<property>
    <name>mapreduce.jobhistory.webapp.address</name>
    <value>192.168.77.130:19888</value>
    <description>MapReduce JobHistory Server IPC host:port</description>
</property>
<!-- 任務運行完成後,history信息所存放的目錄 -->
<property>
    <name>mapreduce.jobhistory.done-dir</name>
    <value>/history/done</value>
</property>
<!-- 任務運行中,history信息所存放的目錄 -->
<property>
    <name>mapreduce.jobhistory.intermediate-done-dir</name>
    <value>/history/done_intermediate</value>
</property>
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# vim yarn-site.xml  # 增加如下內容
<property>
    <name>yarn.log-aggregation-enable</name>
    <value>true</value>
</property>
<property>
    <name>yarn.log.server.url</name>
    <value>http://192.168.77.130:19888/jobhistory</value>
</property>
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/etc/hadoop]# 

編輯完配置文件後,重新啟動YARN服務:

[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./stop-yarn.sh 
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./start-yarn.sh 

啟動JobHistory服務:

[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# ./mr-jobhistory-daemon.sh start historyserver
starting historyserver, logging to /usr/local/hadoop-2.6.0-cdh5.7.0/logs/mapred-root-historyserver-localhost.out
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# 

檢查進程:

[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]# jps
2945 DataNode
12946 JobHistoryServer
3124 SecondaryNameNode
12569 NodeManager
13001 Jps
2812 NameNode
12463 ResourceManager
[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/sbin]#

然後執行一個案例測試一下:

[root@localhost /usr/local/hadoop-2.6.0-cdh5.7.0/share/hadoop/mapreduce]# hadoop jar ./hadoop-mapreduce-examples-2.6.0-cdh5.7.0.jar pi 3 4

任務執行成功後,這時候訪問http://192.168.77.130:19888就可以進入到JobHistory的web頁面了:
技術分享圖片

能夠正常訪問就代表配置已經成功了,現在所有任務的執行日誌都可以在這裏進行查看,有利於我們日常開發中的排錯,而且ui界面操作起來也要方便一些。

分布式計算框架MapReduce