1. 程式人生 > >MapReduce實戰 wordcount

MapReduce實戰 wordcount

昨天在自己的電腦上配置了hadoop,也運行了第一個MapReduce程式WordCount程式。但是對mapreduce的程式設計還很不清楚,在網上轉了一段對wordcount的解釋,轉載學習下。

Wordcount的輸入是資料夾,資料夾內是多個檔案,內容是以空格作分隔符的單詞序列,輸出為單詞,以及他們的數量。

首先,在mapreduce程式中,程式會按照setInputFormat中設定的方法為將輸入切分成一個個InputSplit。在Map過程中,程式會為每一個InputSplit呼叫map函式,這裡即以空格作分隔符將單詞切開。並以單詞作為key,1作為value。需要特別指出的是,mapreduce的<key,value>無論是key還是value都是mapreduce預先定義好的格式,因此在wordcount這個程式中,我們要把String轉換成text格式,int轉換為IntWritable格式。如下:

private final static IntWritable one = new IntWritable(1);

private Text word = new Text();

再做

word.set(tokenizer.nextToken());                         

將這些<key,value>對作為Map的結果傳遞下去

output.collect(word, one);

在Reduce過程中,程式會對每組<key,list of values>呼叫reduce函式,在我們這個程式中,只需讓value相加即可以。最後呼叫output.collect輸出Reduce結果。

以下是程式內容及註釋:

package com.felix;  

import java.io.IOException;  

import java.util.Iterator;  

import java.util.StringTokenizer;  

import org.apache.hadoop.fs.Path;  

import org.apache.hadoop.io.IntWritable;  

import org.apache.hadoop.io.LongWritable;  

import org.apache.hadoop.io.Text;  

import org.apache.hadoop.mapred.FileInputFormat;  

import org.apache.hadoop.mapred.FileOutputFormat;  

import org.apache.hadoop.mapred.JobClient;  

import org.apache.hadoop.mapred.JobConf;  

import org.apache.hadoop.mapred.MapReduceBase;  

import org.apache.hadoop.mapred.Mapper;  

import org.apache.hadoop.mapred.OutputCollector;  

import org.apache.hadoop.mapred.Reducer;  

import org.apache.hadoop.mapred.Reporter;  

import org.apache.hadoop.mapred.TextInputFormat;  

import org.apache.hadoop.mapred.TextOutputFormat;  

/** 

 *  

 * 描述:WordCount explains by Felix 

 * @author Hadoop Dev Group 

 */ 

public class WordCount  

{  

    /** 

     * MapReduceBase類:實現了Mapper和Reducer介面的基類(其中的方法只是實現介面,而未作任何事情) 

     * Mapper介面: 

     * WritableComparable介面:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此介面。 

     * Reporter 則可用於報告整個應用的執行進度,本例中未使用。  

     *  

     */ 

    public static class Map extends MapReduceBase implements 

            Mapper<LongWritable, Text, Text, IntWritable>   //設定了map函式輸入的形式為longwritable<key>text<value>輸出地形式為text<key>intwritable<value>

    {  

        /** 

         * LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 資料型別的類,這些類實現了WritableComparable介面, 

         * 都能夠被序列化從而便於在分散式環境中進行資料交換,你可以將它們分別視為long,int,String 的替代品。 

         */ 

        private final static IntWritable one = new IntWritable(1);   //定義一個intwritable型的常量,用來說明出現過一次

        private Text word = new Text();                        //定義一個text型的變數,用來儲存單詞

        /** 

         * Mapper介面中的map方法: 

         * void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter) 

         * 對映一個單個的輸入k/v對到一箇中間的k/v對 

         * 輸出對不需要和輸入對是相同的型別,輸入對可以對映到0個或多個輸出對。 

         * OutputCollector介面:收集Mapper和Reducer輸出的<k,v>對。 

         * OutputCollector介面的collect(k, v)方法:增加一個(k,v)對到output 

         */ 

        public void map(LongWritable key, Text value,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)   //map中的參變數說明map輸入時的keyvalue對的形式,以及map輸出和reduce接收的keyvalue資料型別

                throws IOException  

        {  

            String line = value.toString();   //將輸入中的一行儲存到line中

           StringTokenizer tokenizer = new StringTokenizer(line);   //將一行儲存到準備切詞的工具中

            while (tokenizer.hasMoreTokens())   //判斷是否到一行的結束

            {  

                word.set(tokenizer.nextToken());  //設定key即word的值為從每一行切下來的單詞  

                output.collect(word, one);       //設定map函式輸出的keyvalue對

            }  

        }  

    }  

public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>        //設定reduce函式中輸入對的資料型別是text和intwritable,輸出對的資料型別是text和intwritable

    {  

        public void reduce(Text key, Iterator<IntWritable> values,  

                OutputCollector<Text, IntWritable> output, Reporter reporter)   //設定reduce函式中輸入對的資料型別是text和intwritable,輸出對的資料型別是text和intwritable

                throws IOException  

        {  

            int sum = 0;  

            while (values.hasNext())        //計算同一個key下,所有value的總和

            {  

                sum += values.next().get();   //獲取下一個value的值

            }  

            output.collect(key, new IntWritable(sum));   //收集reduce輸出結果

        }  

    }  

    public static void main(String[] args) throws Exception  

    {  

        /** 

         * JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工作 

         * 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等 

         */ 

        JobConf conf = new JobConf(WordCount.class);  

        conf.setJobName("wordcount");           //設定一個使用者定義的job名稱  

        conf.setOutputKeyClass(Text.class);    //為job的輸出資料設定Key類  

        conf.setOutputValueClass(IntWritable.class);   //為job輸出設定value類  

        conf.setMapperClass(Map.class);         //為job設定Mapper類  

        conf.setCombinerClass(Reduce.class);      //為job設定Combiner類  

        conf.setReducerClass(Reduce.class);        //為job設定Reduce類  

        conf.setInputFormat(TextInputFormat.class);    //為map-reduce任務設定InputFormat實現類  

        conf.setOutputFormat(TextOutputFormat.class);  //為map-reduce任務設定OutputFormat實現類  

       /** 

         * InputFormat描述map-reduce中對job的輸入定義 

         * setInputPaths():為map-reduce job設定路徑陣列作為輸入列表 

         * setInputPath():為map-reduce job設定路徑陣列作為輸出列表 

         */ 

        FileInputFormat.setInputPaths(conf, new Path(args[0]));  

        FileOutputFormat.setOutputPath(conf, new Path(args[1]));  

        JobClient.runJob(conf);         //執行一個job  

    }