1. 程式人生 > >Mapreduce執行過程分析(基於Hadoop2.4)——(一)

Mapreduce執行過程分析(基於Hadoop2.4)——(一)

轉載自:http://www.cnblogs.com/Scott007/p/3836687.html

1 概述

該瞅瞅MapReduce的內部執行原理了,以前只知道個皮毛,再不搞搞,不然怎麼死的都不曉得。下文會以2.4版本中的WordCount這個經典例子作為分析的切入點,一步步來看裡面到底是個什麼情況。

2 為什麼要使用MapReduce

Map/Reduce,是一種模式,適合解決平行計算的問題,比如TopN、貝葉斯分類等。注意,是平行計算,而非迭代計算,像涉及到層次聚類的問題就不太適合了。

從名字可以看出,這種模式有兩個步驟,Map和Reduce。Map即資料的對映,用於把一組鍵值對對映成另一組新的鍵值對,而Reduce這個東東,以Map階段的輸出結果作為輸入,對資料做化簡、合併等操作。

而MapReduce是Hadoop生態系統中基於底層HDFS的一個計算框架,它的上層又可以是Hive、Pig等資料倉庫框架,也可以是Mahout這樣的資料探勘工具。由於MapReduce依賴於HDFS,其運算過程中的資料等會儲存到HDFS上,把對資料集的計算分發給各個節點,並將結果進行彙總,再加上各種狀態彙報、心跳彙報等,其只適合做離線計算。和實時計算框架Storm、Spark等相比,速度上沒有優勢。舊的Hadoop生態幾乎是以MapReduce為核心的,但是慢慢的發展,其擴充套件性差、資源利用率低、可靠性等問題都越來越讓人覺得不爽,於是才產生了Yarn這個新的東東,並且二代版的Hadoop生態都是以Yarn為核心。Storm、Spark等都可以基於Yarn使用。

3 怎麼執行MapReduce

明白了哪些地方可以使用這個牛叉的MapReduce框架,那該怎麼用呢?Hadoop的MapReduce原始碼給我們提供了範例,在其hadoop-mapreduce-examples子工程中包含了MapReduce的Java版例子。在寫完類似的程式碼後,打包成jar,在HDFS的客戶端執行:

bin/hadoop jar mapreduce_examples.jar mainClass args

即可。當然,也可以在IDE(如Eclipse)中,進行遠端執行、除錯程式。

至於,HadoopStreaming方式,網上有很多。我們這裡只討論Java的實現。

4 如何編寫MapReduce程式

    如前文所說,MapReduce中有Map和Reduce,在實現MapReduce的過程中,主要分為這兩個階段,分別以兩類函式進行展現,一個是map函式,一個是reduce函式。map函式的引數是一個<key,value>鍵值對,其輸出結果也是鍵值對,reduce函式以map的輸出作為輸入進行處理。

4.1 程式碼構成

    實際的程式碼中,需要三個元素,分別是Map、Reduce、執行任務的程式碼。這裡的Map類是繼承了org.apache.hadoop.mapreduce.Mapper,並實現其中的map方法;而Reduce類是繼承了org.apache.hadoop.mapreduce.Reducer,實現其中的reduce方法。至於執行任務的程式碼,就是我們程式的入口。

    下面是Hadoop提供的WordCount原始碼。

複製程式碼
 1 /**
 2  * Licensed to the Apache Software Foundation (ASF) under one
 3  * or more contributor license agreements.  See the NOTICE file
 4  * distributed with this work for additional information
 5  * regarding copyright ownership.  The ASF licenses this file
 6  * to you under the Apache License, Version 2.0 (the
 7  * "License"); you may not use this file except in compliance
 8  * with the License.  You may obtain a copy of the License at
 9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  */
18 package org.apache.hadoop.examples;
19 
20 import java.io.IOException;
21 import java.util.StringTokenizer;
22 
23 import org.apache.hadoop.conf.Configuration;
24 import org.apache.hadoop.fs.Path;
25 import org.apache.hadoop.io.IntWritable;
26 import org.apache.hadoop.io.Text;
27 import org.apache.hadoop.mapreduce.Job;
28 import org.apache.hadoop.mapreduce.Mapper;
29 import org.apache.hadoop.mapreduce.Reducer;
30 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
31 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
32 import org.apache.hadoop.util.GenericOptionsParser;
33 
34 public class WordCount {
35 
36   public static class TokenizerMapper 
37        extends Mapper<Object, Text, Text, IntWritable>{
38     
39     private final static IntWritable one = new IntWritable(1);
40     private Text word = new Text();
41       
42     public void map(Object key, Text value, Context context
43                     ) throws IOException, InterruptedException {
44       StringTokenizer itr = new StringTokenizer(value.toString());
45       while (itr.hasMoreTokens()) {
46         word.set(itr.nextToken());
47         context.write(word, one);
48       }
49     }
50   }
51   
52   public static class IntSumReducer 
53        extends Reducer<Text,IntWritable,Text,IntWritable> {
54     private IntWritable result = new IntWritable();
55 
56     public void reduce(Text key, Iterable<IntWritable> values, 
57                        Context context
58                        ) throws IOException, InterruptedException {
59       int sum = 0;
60       for (IntWritable val : values) {
61         sum += val.get();
62       }
63       result.set(sum);
64       context.write(key, result);
65     }
66   }
67 
68   public static void main(String[] args) throws Exception {
69     Configuration conf = new Configuration();
70     String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
71     if (otherArgs.length != 2) {
72       System.err.println("Usage: wordcount <in> <out>");
73       System.exit(2);
74     }
75     Job job = new Job(conf, "word count");
76     job.setJarByClass(WordCount.class);
77     job.setMapperClass(TokenizerMapper.class);
78     job.setCombinerClass(IntSumReducer.class);
79     job.setReducerClass(IntSumReducer.class);
80     job.setOutputKeyClass(Text.class);
81     job.setOutputValueClass(IntWritable.class);
82     FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
83     FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
84     System.exit(job.waitForCompletion(true) ? 0 : 1);
85   }
86 }
複製程式碼

4.2 入口類

4.2.1 引數獲取

首先定義配置檔案類Configuration,此類是Hadoop各個模組的公共使用類,用於載入類路徑下的各種配置檔案,讀寫其中的配置選項。

    第二步中,用到了GenericOptionsParser類,其目的是將命令列中引數自動設定到變數conf中。

    GenericOptionsParser的構造方法進去之後,會進行到parseGeneralOptions,對傳入的引數進行解析:

複製程式碼
 1 private void parseGeneralOptions(Options opts, Configuration conf,
 2 
 3       String[] args) throws IOException {
 4 
 5     opts = buildGeneralOptions(opts);
 6 
 7     CommandLineParser parser = new GnuParser();
 8 
 9     try {
10 
11       commandLine = parser.parse(opts, preProcessForWindows(args), true);
12 
13       processGeneralOptions(conf, commandLine);
14 
15     } catch(ParseException e) {
16 
17       LOG.warn("options parsing failed: "+e.getMessage());
18 
19  
20 
21       HelpFormatter formatter = new HelpFormatter();
22 
23       formatter.printHelp("general options are: ", opts);
24 
25     }
26 
27   }
複製程式碼

   而getRemainingArgs方法會獲得傳入的引數,接著在main方法中會進行判斷引數的個數,由於此處是WordCount計算,只需要傳入檔案的輸入路徑和輸出路徑即可,因此引數的個數為2,否則將退出:

複製程式碼
1 if (otherArgs.length != 2) {
2 
3       System.err.println("Usage: wordcount <in> <out>");
4 
5       System.exit(2);
6 
7 }
複製程式碼

如果在程式碼執行的時候傳入其他的引數,比如指定reduce的個數,可以根據GenericOptionsParser的命令列格式這麼寫:

bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

其規則是-D加MapReduce的配置選項,當然還支援-fs等其他引數傳入。當然,預設情況下Reduce的數目為1,Map的數目也為1

4.2.2 Job定義

   定義Job物件,其構造方法為:

複製程式碼
1 public Job(Configuration conf, String jobName) throws IOException {
2 
3     this(conf);
4 
5     setJobName(jobName);
6 
7   }
複製程式碼

可見,傳入的"word count"就是Job的名字。而conf被傳遞給了JobConf進行環境變數的獲取:

複製程式碼
 1 public JobConf(Configuration conf) {
 2 
 3     super(conf);    
 6 
 7     if (conf instanceof JobConf) {
 8 
 9       JobConf that = (JobConf)conf;
10 
11       credentials = that.credentials;
12 
13     }
14      checkAndWarnDeprecation(); 
19   }
複製程式碼

    Job已經例項化了,下面就得給這個Job加點佐料才能讓它按照我們的要求執行。於是依次給Job新增啟動Jar包、設定Mapper類、設定合併類、設定Reducer類、設定輸出鍵型別、設定輸出值的型別。

    這裡有必要說下設定Jar包的這個方法setJarByClass:

複製程式碼
1 public void setJarByClass(Class<?> cls) {
2 
3     ensureState(JobState.DEFINE);
4 
5     conf.setJarByClass(cls);
6 
7   }
複製程式碼

它會首先判斷當前Job的狀態是否是執行中,接著通過class找到其所屬的jar檔案,將jar路徑賦值給mapreduce.job.jar屬性。至於尋找jar檔案的方法,則是通過classloader獲取類路徑下的資原始檔,進行迴圈遍歷。具體實現見ClassUtil類中的findContainingJar方法。

    搞完了上面的東西,緊接著就會給mapreduce.input.fileinputformat.inputdir引數賦值,這是Job的輸入路徑,還有mapreduce.input.fileinputformat.inputdir,這是Job的輸出路徑。具體的位置,就是我們前面main中傳入的Args。

4.2.3 Job提交

    萬事俱備,那就執行吧。

    這裡呼叫的方法如下:

複製程式碼
 1 public boolean waitForCompletion(boolean verbose
 2 
 3                                    ) throws IOException, InterruptedException,
 4 
 5                                             ClassNotFoundException {
 6 
 7     if (state == JobState.DEFINE) {
 8 
 9       submit();
10 
11     }
12 
13     if (verbose) {
14 
15       monitorAndPrintJob();
16 
17     } else {
18 
19       // get the completion poll interval from the client.
20 
21       int completionPollIntervalMillis =
22 
23         Job.getCompletionPollInterval(cluster.getConf());
24 
25       while (!isComplete()) {
26 
27         try {
28 
29           Thread.sleep(completionPollIntervalMillis);
30 
31         } catch (InterruptedException ie) {
32 
33         }
34 
35       }
36 
37     }
38 
39     return isSuccessful();
40 
41   }
複製程式碼

至於方法的引數verbose,如果想在控制檯列印當前的進度,則設定為true。

   至於submit方法,如果當前在HDFS的配置檔案中配置了mapreduce.framework.name屬性為“yarn”的話,會建立一個YARNRunner物件來進行任務的提交。其構造方法如下:

複製程式碼
 1 public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate,
 2 
 3       ClientCache clientCache) {
 4 
 5     this.conf = conf;
 6 
 7     try {
 8 
 9       this.resMgrDelegate = resMgrDelegate;
10 
11       this.clientCache = clientCache;
12 
13       this.defaultFileContext = FileContext.getFileContext(this.conf);
14 
15     } catch (UnsupportedFileSystemException ufe) {
16 
17       throw new RuntimeException("Error in instantiating YarnClient", ufe);
18 
19     }
20 
21   }
複製程式碼

其中,ResourceMgrDelegate實際上ResourceManager的代理類,其實現了YarnClient介面,通過ApplicationClientProtocol代理直接向RM提交Job,殺死Job,檢視Job執行狀態等操作。同時,在ResourceMgrDelegate類中會通過YarnConfiguration來讀取yarn-site.xml、core-site.xml等配置檔案中的配置屬性。

   下面就到了客戶端最關鍵的時刻了,提交Job到叢集執行。具體實現類是JobSubmitter類中的submitJobInternal方法。這個牛氣哄哄的方法寫了100多行,還不算其幾十行的註釋。我們看它幹了點啥。

Step1:

檢查job的輸出路徑是否存在,如果存在則丟擲異常。

Step2:

初始化用於存放Job相關資源的路徑。注意此路徑的構造方式為:

複製程式碼
1 conf.get(MRJobConfig.MR_AM_STAGING_DIR,
2 
3         MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
4 
5         + Path.SEPARATOR + user
6 
7 + Path.SEPARATOR + STAGING_CONSTANT
複製程式碼

其中,MRJobConfig.DEFAULT_MR_AM_STAGING_DIR為“/tmp/hadoop-yarn/staging”,STAGING_CONSTANT為".staging"。

Step3:

設定客戶端的host屬性:mapreduce.job.submithostname和mapreduce.job.submithostaddress

Step4:

通過RPC,向Yarn的ResourceManager申請JobID物件。

Step5:

從HDFS的NameNode獲取驗證用的Token,並將其放入快取。

Step6:

將作業檔案上傳到HDFS,這裡如果我們前面沒有對Job命名的話,預設的名稱就會在這裡設定成jar的名字。並且,作業預設的副本數是10,如果屬性mapreduce.client.submit.file.replication沒有被設定的話。

Step7:

檔案上傳到HDFS之後,還要被DistributedCache進行快取起來。這是因為計算節點收到該作業的第一個任務後,就會有DistributedCache自動將作業檔案Cache到節點本地目錄下,並且會對壓縮檔案進行解壓,如:.zip,.jar,.tar等等,然後開始任務。

最後,對於同一個計算節點接下來收到的任務,DistributedCache不會重複去下載作業檔案,而是直接執行任務。如果一個作業的任務數很多,這種設計避免了在同一個節點上對用一個job的檔案會下載多次,大大提高了任務執行的效率。

Step8:

對每個輸入檔案進行split劃分。注意這只是個邏輯的劃分,不是物理的。因為此處是輸入檔案,因此執行的是FileInputFormat類中的getSplits方法。只有非壓縮的檔案和幾種特定壓縮方式壓縮後的檔案才分片。分片的大小由如下幾個引數決定:mapreduce.input.fileinputformat.split.maxsizemapreduce.input.fileinputformat.split.minsize、檔案的塊大小。

具體計算方式為:

Math.max(minSize, Math.min(maxSize, blockSize))

分片的大小有可能比預設塊大小64M要大,當然也有可能小於它,預設情況下分片大小為當前HDFS的塊大小,64M。

   接下來就該正兒八經的獲取分片詳情了。程式碼如下:

複製程式碼
 1           long bytesRemaining = length; 2 
 3           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
 4 
 5             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
 6 
 7             splits.add(makeSplit(path, length-bytesRemaining, splitSize, 
 9