MapReduce實戰 wordcount
昨天在自己的電腦上配置了hadoop,也運行了第一個MapReduce程式WordCount程式。但是對mapreduce的程式設計還很不清楚,在網上轉了一段對wordcount的解釋,轉載學習下。
Wordcount的輸入是資料夾,資料夾內是多個檔案,內容是以空格作分隔符的單詞序列,輸出為單詞,以及他們的數量。
首先,在mapreduce程式中,程式會按照setInputFormat中設定的方法為將輸入切分成一個個InputSplit。在Map過程中,程式會為每一個InputSplit呼叫map函式,這裡即以空格作分隔符將單詞切開。並以單詞作為key,1作為value。需要特別指出的是,mapreduce的<key,value>無論是key還是value都是mapreduce預先定義好的格式,因此在wordcount這個程式中,我們要把String轉換成text格式,int轉換為IntWritable格式。如下:
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
再做
word.set(tokenizer.nextToken());
將這些<key,value>對作為Map的結果傳遞下去
output.collect(word, one);
在Reduce過程中,程式會對每組<key,list of values>呼叫reduce函式,在我們這個程式中,只需讓value相加即可以。最後呼叫output.collect輸出Reduce結果。
以下是程式內容及註釋:
package com.felix;
import java.io.IOException;
import java.util.Iterator;
import java.util.StringTokenizer;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
/**
*
* 描述:WordCount explains by Felix
* @author Hadoop Dev Group
*/
public class WordCount
{
/**
* MapReduceBase類:實現了Mapper和Reducer介面的基類(其中的方法只是實現介面,而未作任何事情)
* Mapper介面:
* WritableComparable介面:實現WritableComparable的類可以相互比較。所有被用作key的類應該實現此介面。
* Reporter 則可用於報告整個應用的執行進度,本例中未使用。
*
*/
public static class Map extends MapReduceBase implements
Mapper<LongWritable, Text, Text, IntWritable> //設定了map函式輸入的形式為longwritable<key>text<value>輸出地形式為text<key>intwritable<value>
{
/**
* LongWritable, IntWritable, Text 均是 Hadoop 中實現的用於封裝 Java 資料型別的類,這些類實現了WritableComparable介面,
* 都能夠被序列化從而便於在分散式環境中進行資料交換,你可以將它們分別視為long,int,String 的替代品。
*/
private final static IntWritable one = new IntWritable(1); //定義一個intwritable型的常量,用來說明出現過一次
private Text word = new Text(); //定義一個text型的變數,用來儲存單詞
/**
* Mapper介面中的map方法:
* void map(K1 key, V1 value, OutputCollector<K2,V2> output, Reporter reporter)
* 對映一個單個的輸入k/v對到一箇中間的k/v對
* 輸出對不需要和輸入對是相同的型別,輸入對可以對映到0個或多個輸出對。
* OutputCollector介面:收集Mapper和Reducer輸出的<k,v>對。
* OutputCollector介面的collect(k, v)方法:增加一個(k,v)對到output
*/
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter) //map中的參變數說明map輸入時的keyvalue對的形式,以及map輸出和reduce接收的keyvalue資料型別
throws IOException
{
String line = value.toString(); //將輸入中的一行儲存到line中
StringTokenizer tokenizer = new StringTokenizer(line); //將一行儲存到準備切詞的工具中
while (tokenizer.hasMoreTokens()) //判斷是否到一行的結束
{
word.set(tokenizer.nextToken()); //設定key即word的值為從每一行切下來的單詞
output.collect(word, one); //設定map函式輸出的keyvalue對
}
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> //設定reduce函式中輸入對的資料型別是text和intwritable,輸出對的資料型別是text和intwritable
{
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, IntWritable> output, Reporter reporter) //設定reduce函式中輸入對的資料型別是text和intwritable,輸出對的資料型別是text和intwritable
throws IOException
{
int sum = 0;
while (values.hasNext()) //計算同一個key下,所有value的總和
{
sum += values.next().get(); //獲取下一個value的值
}
output.collect(key, new IntWritable(sum)); //收集reduce輸出結果
}
}
public static void main(String[] args) throws Exception
{
/**
* JobConf:map/reduce的job配置類,向hadoop框架描述map-reduce執行的工作
* 構造方法:JobConf()、JobConf(Class exampleClass)、JobConf(Configuration conf)等
*/
JobConf conf = new JobConf(WordCount.class);
conf.setJobName("wordcount"); //設定一個使用者定義的job名稱
conf.setOutputKeyClass(Text.class); //為job的輸出資料設定Key類
conf.setOutputValueClass(IntWritable.class); //為job輸出設定value類
conf.setMapperClass(Map.class); //為job設定Mapper類
conf.setCombinerClass(Reduce.class); //為job設定Combiner類
conf.setReducerClass(Reduce.class); //為job設定Reduce類
conf.setInputFormat(TextInputFormat.class); //為map-reduce任務設定InputFormat實現類
conf.setOutputFormat(TextOutputFormat.class); //為map-reduce任務設定OutputFormat實現類
/**
* InputFormat描述map-reduce中對job的輸入定義
* setInputPaths():為map-reduce job設定路徑陣列作為輸入列表
* setInputPath():為map-reduce job設定路徑陣列作為輸出列表
*/
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf); //執行一個job
}
}