1. 程式人生 > >雲端計算_Hadoop_Wordcount程式執行詳解

雲端計算_Hadoop_Wordcount程式執行詳解

@轉載自:https://www.cnblogs.com/xia520pi/archive/2012/05/16/2504205.html

@轉載原因:方便自己檢視;CSDN其他使用者查閱。

1、MapReduce理論簡介

 

1.1 MapReduce程式設計模型

  MapReduce採用"分而治之"的思想,把對大規模資料集的操作,分發給一個主節點管理下的各個分節點共同完成,然後通過整合各個節點的中間結果,得到最終結果。簡單地說,MapReduce就是"任務的分解與結果的彙總"。

  在Hadoop中,用於執行MapReduce任務的機器角色有兩個:一個是JobTracker;另一個是TaskTracker,JobTracker是用於排程工作的,TaskTracker是用於執行工作的。一個Hadoop叢集中只有一臺JobTracker。

  在分散式計算中,MapReduce框架負責處理了並行程式設計中分散式儲存、工作排程、負載均衡、容錯均衡、容錯處理以及網路通訊等複雜問題,把處理過程高度抽象為兩個函式:map和reduce,map負責把任務分解成多個任務,reduce負責把分解後多工處理的結果彙總起來。

  需要注意的是,用MapReduce來處理的資料集(或任務)必須具備這樣的特點:待處理的資料集可以分解成許多小的資料集,而且每一個小資料集都可以完全並行地進行處理。

 

1.2 MapReduce處理過程

  在Hadoop中,每個MapReduce任務都被初始化為一個Job,每個Job又可以分為兩種階段:map階段和reduce階段。這兩個階段分別用兩個函式表示,即map函式和reduce函式。map函式接收一個<key,value>形式的輸入,然後同樣產生一個<key,value>形式的中間輸出,Hadoop函式接收一個如<key,(list of values)>形式的輸入,然後對這個value集合進行處理,每個reduce產生0或1個輸出,reduce的輸出也是<key,value>形式的。

 

 image

MapReduce處理大資料集的過程

 

 

2、執行WordCount程式

  單詞計數是最簡單也是最能體現MapReduce思想的程式之一,可以稱為MapReduce版"Hello World",該程式的完整程式碼可以在Hadoop安裝包的"src/examples"目錄下找到。單詞計數主要完成功能是:統計一系列文字檔案中每個單詞出現的次數,如下圖所示。

 

 image

 

2.1 準備工作

  現在以"hadoop"普通使用者登入"Master.Hadoop"伺服器。

  1)建立本地示例檔案

  首先在"/home/hadoop"目錄下建立資料夾"file"。

 

 image

 

  接著建立兩個文字檔案file1.txt和file2.txt,使file1.txt內容為"Hello World",而file2.txt的內容為"Hello Hadoop"。

 

image

 

  2)在HDFS上建立輸入資料夾

 

image

 

  3)上傳本地file中檔案到叢集的input目錄下

 

image

 

2.2 執行例子

  1)在叢集上執行WordCount程式

  備註:以input作為輸入目錄,output目錄作為輸出目錄。

  已經編譯好的WordCount的Jar在"/usr/hadoop"下面,就是"hadoop-examples-1.0.0.jar",所以在下面執行命令時記得把路徑寫全了,不然會提示找不到該Jar包。

 

 image

 

  2)MapReduce執行過程顯示資訊

 

image

 

  Hadoop命令會啟動一個JVM來執行這個MapReduce程式,並自動獲得Hadoop的配置,同時把類的路徑(及其依賴關係)加入到Hadoop的庫中。以上就是Hadoop Job的執行記錄,從這裡可以看到,這個Job被賦予了一個ID號:job_201202292213_0002,而且得知輸入檔案有兩個(Total input paths to process : 2),同時還可以瞭解map的輸入輸出記錄(record數及位元組數),以及reduce輸入輸出記錄。比如說,在本例中,map的task數量是2個,reduce的task數量是一個。map的輸入record數是2個,輸出record數是4個等資訊。

 

2.3 檢視結果

  1)檢視HDFS上output目錄內容

 

image

 

  從上圖中知道生成了三個檔案,我們的結果在"part-r-00000"中。

  2)檢視結果輸出檔案內容

 

image

 

3、WordCount原始碼分析

 

3.1 特別資料型別介紹

  Hadoop提供瞭如下內容的資料型別,這些資料型別都實現了WritableComparable介面,以便用這些型別定義的資料可以被序列化進行網路傳輸和檔案儲存,以及進行大小比較。

 

    BooleanWritable:標準布林型數值

    ByteWritable:單位元組數值

    DoubleWritable:雙位元組數

    FloatWritable:浮點數

    IntWritable:整型數

    LongWritable:長整型數

    Text:使用UTF8格式儲存的文字

    NullWritable:當<key,value>中的key或value為空時使用

 

3.2 舊的WordCount分析

  1)原始碼程式

 

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.Iterator;

import java.util.StringTokenizer;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class WordCount {

    public static class Map extends MapReduceBase implements

            Mapper<LongWritable, Text, Text, IntWritable> {

        private final static IntWritable one = new IntWritable(1);

        private Text word = new Text();

        public void map(LongWritable key, Text value,

                OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            String line = value.toString();

            StringTokenizer tokenizer = new StringTokenizer(line);

            while (tokenizer.hasMoreTokens()) {

                word.set(tokenizer.nextToken());

                output.collect(word, one);

            }

        }

    }

    public static class Reduce extends MapReduceBase implements

            Reducer<Text, IntWritable, Text, IntWritable> {

        public void reduce(Text key, Iterator<IntWritable> values,

                OutputCollector<Text, IntWritable> output, Reporter reporter)

                throws IOException {

            int sum = 0;

            while (values.hasNext()) {

                sum += values.next().get();

            }

            output.collect(key, new IntWritable(sum));

        }

    }

    public static void main(String[] args) throws Exception {

        JobConf conf = new JobConf(WordCount.class);

        conf.setJobName("wordcount");

        conf.setOutputKeyClass(Text.class);

        conf.setOutputValueClass(IntWritable.class);

        conf.setMapperClass(Map.class);

        conf.setCombinerClass(Reduce.class);

        conf.setReducerClass(Reduce.class);

        conf.setInputFormat(TextInputFormat.class);

        conf.setOutputFormat(TextOutputFormat.class);

        FileInputFormat.setInputPaths(conf, new Path(args[0]));

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

        JobClient.runJob(conf);

    }

}

 

  3)主方法Main分析

 

public static void main(String[] args) throws Exception {

    JobConf conf = new JobConf(WordCount.class);

    conf.setJobName("wordcount");

    conf.setOutputKeyClass(Text.class);

    conf.setOutputValueClass(IntWritable.class);

    conf.setMapperClass(Map.class);

    conf.setCombinerClass(Reduce.class);

    conf.setReducerClass(Reduce.class);

    conf.setInputFormat(TextInputFormat.class);

    conf.setOutputFormat(TextOutputFormat.class);

    FileInputFormat.setInputPaths(conf, new Path(args[0]));

    FileOutputFormat.setOutputPath(conf, new Path(args[1]));

    JobClient.runJob(conf);

}

 

  首先講解一下Job初始化過程main函式呼叫Jobconf類來對MapReduce Job進行初始化,然後呼叫setJobName()方法命名這個Job。對Job進行合理的命名有助於更快地找到Job,以便在JobTracker和Tasktracker的頁面中對其進行監視

 

JobConf conf = new JobConf(WordCount. class ); conf.setJobName("wordcount" );

 

  接著設定Job輸出結果<key,value>的中key和value資料型別,因為結果是<單詞,個數>,所以key設定為"Text"型別,相當於Java中String型別。Value設定為"IntWritable",相當於Java中的int型別。

 

conf.setOutputKeyClass(Text.class );

conf.setOutputValueClass(IntWritable.class );

 

  然後設定Job處理的Map(拆分)、Combiner(中間結果合併)以及Reduce(合併)的相關處理類。這裡用Reduce類來進行Map產生的中間結果合併,避免給網路資料傳輸產生壓力。

 

conf.setMapperClass(Map.class );

conf.setCombinerClass(Reduce.class );

conf.setReducerClass(Reduce.class );

 

  接著就是呼叫setInputPath()和setOutputPath()設定輸入輸出路徑。

 

conf.setInputFormat(TextInputFormat.class );

conf.setOutputFormat(TextOutputFormat.class );

 

  (1)InputFormat和InputSplit

  InputSplit是Hadoop定義的用來傳送給每個單獨map資料,InputSplit儲存的並資料本身而是一個分片長度和一個記錄資料位置陣列生成InputSplit的方法可以通過InputFormat()設定

  當資料傳送給map時,map會將輸入分片傳送到InputFormat,InputFormat則呼叫方法getRecordReader()生成RecordReaderRecordReader再通過creatKey()creatValue()方法建立可供map處理的<key,value>對。簡而言之,InputFormat()方法是用來生成可供map處理的<key,value>對的。

  Hadoop預定義了多種方法將不同型別的輸入資料轉化為map能夠處理的<key,value>對,它們都繼承自InputFormat,分別是:

 

    InputFormat

        |

        |---BaileyBorweinPlouffe.BbpInputFormat

        |---ComposableInputFormat

        |---CompositeInputFormat

        |---DBInputFormat

        |---DistSum.Machine.AbstractInputFormat

        |---FileInputFormat

            |---CombineFileInputFormat

            |---KeyValueTextInputFormat

            |---NLineInputFormat

            |---SequenceFileInputFormat

            |---TeraInputFormat

            |---TextInputFormat

 

  其中TextInputFormat是Hadoop預設的輸入方法,在TextInputFormat中,每個檔案(或其一部分)都會單獨地作為map的輸入,而這個是繼承自FileInputFormat的。之後,每行資料都會生成一條記錄,每條記錄則表示成<key,value>形式:

  • key值是每個資料的記錄在資料分片位元組偏移量,資料型別是LongWritable;  

value值是每行的內容,資料型別是Text

  (2)OutputFormat

  每一種格式都有一種格式與其對應。預設的輸出格式是TextOutputFormat,這種輸出方式與輸入類似,會將每條記錄以一行的形式存入文字檔案。不過,它的鍵和值可以是任意形式的,因為程式內容會呼叫toString()方法將鍵和值轉換為String型別再輸出。

 

  3)Map類中map方法分析

 

public static class Map extends MapReduceBase implements
        Mapper<LongWritable, Text, Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();
    public void map(LongWritable key, Text value,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        String line = value.toString();
        StringTokenizer tokenizer = new StringTokenizer(line);
        while (tokenizer.hasMoreTokens()) {
            word.set(tokenizer.nextToken());
            output.collect(word, one);
        }
    }

}

 

  Map類繼承自MapReduceBase,並且它實現了Mapper介面,此介面是一個規範型別,它有4種形式的引數,分別用來指定map的輸入key值型別、輸入value值型別、輸出key值型別和輸出value值型別。在本例中,因為使用的是TextInputFormat,它的輸出key值是LongWritable型別,輸出value值是Text型別,所以map的輸入型別為<LongWritable,Text>。在本例中需要輸出<word,1>這樣的形式,因此輸出的key值型別是Text,輸出的value值型別是IntWritable。

  實現此介面類還需要實現map方法,map方法會具體負責對輸入進行操作,在本例中,map方法對輸入的行以空格為單位進行切分,然後使用OutputCollect收集輸出的<word,1>。

 

  4)Reduce類中reduce方法分析

 

public static class Reduce extends MapReduceBase implements
        Reducer<Text, IntWritable, Text, IntWritable> {
    public void reduce(Text key, Iterator<IntWritable> values,
            OutputCollector<Text, IntWritable> output, Reporter reporter)
            throws IOException {
        int sum = 0;
        while (values.hasNext()) {
            sum += values.next().get();
        }
        output.collect(key, new IntWritable(sum));

    }

}

 

  Reduce類也是繼承自MapReduceBase的,需要實現Reducer介面。Reduce類以map的輸出作為輸入,因此Reduce的輸入型別是<Text,Intwritable>。而Reduce的輸出是單詞它的數目,因此,它的輸出型別是<Text,IntWritable>。Reduce類也要實現reduce方法,在此方法中,reduce函式將輸入的key值作為輸出的key值,然後將獲得多個value值加起來,作為輸出的值。

 

3.3 新的WordCount分析

  1)原始碼程式

 

package org.apache.hadoop.examples;

import java.io.IOException;

import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.mapreduce.Mapper;

import org.apache.hadoop.mapreduce.Reducer;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class WordCount {

  public static class TokenizerMapper

      extends Mapper<Object, Text, Text, IntWritable>{

      private final static IntWritable one = new IntWritable(1);

      private Text word = new Text();

 

      public void map(Object key, Text value, Context context)

        throws IOException, InterruptedException {

        StringTokenizer itr = new StringTokenizer(value.toString());

        while (itr.hasMoreTokens()) {

        word.set(itr.nextToken());

        context.write(word, one);

      }

    }

  }

  public static class IntSumReducer

      extends Reducer<Text,IntWritable,Text,IntWritable> {

      private IntWritable result = new IntWritable();

      public void reduce(Text key, Iterable<IntWritable> values,Context context)

           throws IOException, InterruptedException {

        int sum = 0;

        for (IntWritable val : values) {

           sum += val.get();

        }

      result.set(sum);

      context.write(key, result);

    }

  }

 

  public static void main(String[] args) throws Exception {

    Configuration conf = new Configuration();

    String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

    if (otherArgs.length != 2) {

      System.err.println("Usage: wordcount <in> <out>");

      System.exit(2);

    }

    Job job = new Job(conf, "word count");

    job.setJarByClass(WordCount.class);

    job.setMapperClass(TokenizerMapper.class);

    job.setCombinerClass(IntSumReducer.class);

    job.setReducerClass(IntSumReducer.class);

    job.setOutputKeyClass(Text.class);

    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

    System.exit(job.waitForCompletion(true) ? 0 : 1);

}

}

 

   1)Map過程

 

public static class TokenizerMapper

  extends Mapper<Object, Text, Text, IntWritable>{

  private final static IntWritable one = new IntWritable(1);

  private Text word = new Text();

  public void map(Object key, Text value, Context context)

    throws IOException, InterruptedException {

    StringTokenizer itr = new StringTokenizer(value.toString());

    while (itr.hasMoreTokens()) {

      word.set(itr.nextToken());

      context.write(word, one);

  }

}

 

  Map過程需要繼承org.apache.hadoop.mapreduce包中Mapper類,並重寫其map方法。通過在map方法中新增兩句把key值和value值輸出到控制檯的程式碼,可以發現map方法中value值儲存的是文字檔案中的一行(以回車符為行結束標記),而key值為該行的首字母相對於文字檔案的首地址的偏移量。然後StringTokenizer類將每一行拆分成為一個個的單詞,並將<word,1>作為map方法的結果輸出,其餘的工作都交有MapReduce框架處理。

 

  2)Reduce過程

 

public static class IntSumReducer

  extends Reducer<Text,IntWritable,Text,IntWritable> {

  private IntWritable result = new IntWritable();

  public void reduce(Text key, Iterable<IntWritable> values,Context context)

     throws IOException, InterruptedException {

    int sum = 0;

    for (IntWritable val : values) {

      sum += val.get();

    }

    result.set(sum);

    context.write(key, result);

  }

}

 

  Reduce過程需要繼承org.apache.hadoop.mapreduce包中Reducer類,並重寫其reduce方法。Map過程輸出<key,values>中key為單個單詞,而values是對應單詞的計數值所組成的列表,Map的輸出就是Reduce的輸入,所以reduce方法只要遍歷values並求和,即可得到某個單詞的總次數。

 

    3)執行MapReduce任務

 

public static void main(String[] args) throws Exception {

  Configuration conf = new Configuration();

  String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

  if (otherArgs.length != 2) {

    System.err.println("Usage: wordcount <in> <out>");

    System.exit(2);

  }

  Job job = new Job(conf, "word count");

  job.setJarByClass(WordCount.class);

  job.setMapperClass(TokenizerMapper.class);

  job.setCombinerClass(IntSumReducer.class);

  job.setReducerClass(IntSumReducer.class);

  job.setOutputKeyClass(Text.class);

  job.setOutputValueClass(IntWritable.class);

  FileInputFormat.addInputPath(job, new Path(otherArgs[0]));

  FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));

  System.exit(job.waitForCompletion(true) ? 0 : 1);

}

 

  在MapReduce中,由Job物件負責管理和執行一個計算任務,並通過Job的一些方法對任務的引數進行相關的設定。此處設定了使用TokenizerMapper完成Map過程中的處理和使用IntSumReducer完成Combine和Reduce過程中的處理。還設定了Map過程和Reduce過程的輸出型別:key的型別為Text,value的型別為IntWritable。任務的輸出和輸入路徑則由命令列引數指定,並由FileInputFormat和FileOutputFormat分別設定。完成相應任務的引數設定後,即可呼叫job.waitForCompletion()方法執行任務。

 

4、WordCount處理過程

  本節將對WordCount進行更詳細的講解。詳細執行步驟如下:

 

  1)將檔案拆分成splits,由於測試用的檔案較小,所以每個檔案為一個split,並將檔案按行分割形成<key,value>對,如圖4-1所示。這一步由MapReduce框架自動完成,其中偏移量(即key值)包括了回車所佔的字元數(Windows和Linux環境會不同)。

 

 image

圖4-1 分割過程

 

  2)將分割好的<key,value>對交給使用者定義的map方法進行處理,生成新的<key,value>對,如圖4-2所示。

 

 image

圖4-2 執行map方法

 

  3)得到map方法輸出的<key,value>對後,Mapper會將它們按照key值進行排序,並執行Combine過程,將key至相同value值累加,得到Mapper的最終輸出結果。如圖4-3所示。

 

 image

圖4-3 Map端排序及Combine過程

 

  4)Reducer先對從Mapper接收的資料進行排序,再交由使用者自定義的reduce方法進行處理,得到新的<key,value>對,並作為WordCount的輸出結果,如圖4-4所示。

 

 image

圖4-4 Reduce端排序及輸出結果

 

5、MapReduce新舊改變

  Hadoop最新版本的MapReduce Release 0.20.0的API包括了一個全新的Mapreduce JAVA API,有時候也稱為上下文物件。

  新的API型別上不相容以前的API,所以,以前的應用程式需要重寫才能使新的API發揮其作用 。

  新的API和舊的API之間有下面幾個明顯的區別。

  • 新的API傾向於使用抽象類,而不是介面,因為這更容易擴充套件。例如,你可以新增一個方法(用預設的實現)到一個抽象類而不需修改類之前的實現方法。在新的API中,Mapper和Reducer是抽象類。
  • 新的API是在org.apache.hadoop.mapreduce包(和子包)中的。之前版本的API則是放在org.apache.hadoop.mapred中的。
  • 新的API廣泛使用context object(上下文物件),並允許使用者程式碼與MapReduce系統進行通訊。例如,MapContext基本上充當著JobConf的OutputCollector和Reporter的角色。
  • 新的API同時支援"推"和"拉"式的迭代。在這兩個新老API中,鍵/值記錄對被推mapper中,但除此之外,新的API允許把記錄從map()方法中拉出,這也適用於reducer。"拉"式的一個有用的例子是分批處理記錄,而不是一個接一個。
  • 新的API統一了配置。舊的API有一個特殊的JobConf物件用於作業配置,這是一個對於Hadoop通常的Configuration物件的擴充套件。在新的API中,這種區別沒有了,所以作業配置通過Configuration來完成。作業控制的執行由Job類來負責,而不是JobClient,它在新的API中已經蕩然無存。

     

      文章下載地址:http://files.cnblogs.com/xia520pi/HadoopCluster_Vol.6.rar