【圖文解析 】MapReduce 示例程式編寫及編碼規範
上一步,我們查看了 WordCount 這個 MapReduce 程式的原始碼編寫,可以得出幾點結論:
1、 該程式有一個 main 方法,來啟動任務的執行,其中 job 物件就儲存了該程式執行的必要 資訊,比如指定 Mapper 類和 Reducer 類 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);
2、 該程式中的 TokenizerMapper 類繼承了 Mapper 類
3、 該程式中的 IntSumReducer 類繼承了 Reducer 類 總結:MapReduce 程式的業務編碼分為兩個大部分,一部分配置程式的執行資訊,一部分 編寫該 MapReduce 程式的業務邏輯,並且業務邏輯的 map 階段和 reduce 階段的程式碼分別繼 承 Mapper 類和 Reducer 類
MapReduce 程式編寫規範:
1、使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行 MR 程式的客戶端)
2、Mapper 的輸入資料是 KV 對的形式(KV 的型別可自定義)
3、Mapper 的輸出資料是 KV 對的形式(KV 的型別可自定義)
4、Mapper 中的業務邏輯寫在 map()方法中
5、map()方法(maptask 程序)對每一個<K,V>呼叫一次
6、Reducer 的輸入資料型別對應 Mapper 的輸出資料型別,也是 KV 對的形式
7、Reducer 的業務邏輯寫在 reduce()方法中
8、Reducetask 程序對每一組相同 k 的<K,V>組呼叫一次 reduce()方法
9、使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類
10、整個程式需要一個 Drvier 來進行提交,提交的是一個描述了各種必要資訊的 job 物件 WordCount 的業務邏輯:
1、 maptask 階段處理每個資料分塊的單詞統計分析,思路是每遇到一個單詞則把其轉換成 一個 key-value 對,比如單詞 hello,就轉換成<’hello’,1>傳送給 reducetask 去彙總
2、 reducetask 階段將接受 maptask 的結果,來做彙總計數 下面是具體實現,首先看 Map:
static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
@Override
protected void map(LongWritable key, Text value,Context context)
throws IOException, InterruptedException {
// 計算任務程式碼:切割單詞,輸出每個單詞計 1 的 key-value 對
String[] words = value.toString().split(" ");
for(String word: words){
context.write(new Text(word), new IntWritable(1));
}
}
}
其次看 Reduce:
static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
// 彙總計算程式碼:對每個 key 相同的一組 key-value 做彙總統計
int sum = 0;
for(IntWritable v: values){
sum += v.get();
}
context.write(key, new IntWritable(sum));
}
}
在看 Job:
public static void main(String[] args) throws Exception {
// 指定 hdfs 相關的引數
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
System.setProperty("HADOOP_USER_NAME", "hadoop");
// 新建一個 job 任務
Job job = Job.getInstance(conf);
// 設定 jar 包所在路徑
job.setJarByClass(WordCountMR.class);
// 指定 mapper 類和 reducer 類
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 指定 maptask 的輸出型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 指定 reducetask 的輸出型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 指定該 mapreduce 程式資料的輸入和輸出路徑
Path inputPath = new Path("/wordcount/input");
Path outputPath = new Path("/wordcount/output");
FileInputFormat.setInputPaths(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
// 最後提交任務
boolean waitForCompletion = job.waitForCompletion(true);
System.exit(waitForCompletion?0:1); }