大資料教程(8.8)MR內部的shuffle過程詳解&combiner的執行機制及程式碼實現
阿新 • • 發佈:2018-12-05
之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。
一、mapreduce運作流程長卷圖(其中[深]硃紅色代表是可以使用者自定義的部分,當然它們有預設實現)
二、shuffle過程中的combiner自定義實現
首先combiner元件有什麼作用呢?它可以減少我們在shuffle歸併排序是的次數、reduce階段處理的資料次數,同時可以有效提供程式的執行效率。
以下是wordcount使用combiner實現的程式碼
(1) maper實現:
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; /** * WordcountMapper.java的實現描述: KEYIN: 預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long, * 但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable * VALUEIN:預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text * KEYOUT:是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text * VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable 類 * * @author arron 2018年12月4日 下午9:30:09 */ public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> { /** * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入資料呼叫一次我們自定義的map()方法 */ @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //將maptask傳給我們的文字內容先轉換成String String line = value.toString(); //根據空格將這一行切分成單詞 String[] words = line.split(" "); //將單詞輸出為<單詞,1> for (String word : words) { //將單詞作為key,將次數1作為value,以便於後續的資料分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task context.write(new Text(word), new IntWritable(1)); } } }
(2) reducer實現實現:
package com.empire.hadoop.mr.wccombinerdemo; import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; /** * 類 WordcountReducer.java的實現描述:KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT型別對應 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別 KEYOUT是單詞 VLAUEOUT是總次數 * * @author arron 2018年12月4日 下午9:51:15 */ public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> { /** * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1> * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1> * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1> * 入參key,是一組相同單詞kv對的key */ @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int count = 0; /* * Iterator<IntWritable> iterator = values.iterator(); * while(iterator.hasNext()){ count += iterator.next().get(); } */ for (IntWritable value : values) { count += value.get(); } context.write(key, new IntWritable(count)); } }
(3) combiner實現實現:
package com.empire.hadoop.mr.wccombinerdemo;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
/**
* 類 WordcountCombiner.java的實現描述:輸如為map的輸出
*
* @author arron 2018年12月4日 下午9:29:25
*/
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context)
throws IOException, InterruptedException {
int count = 0;
for (IntWritable v : values) {
count += v.get();
}
context.write(key, new IntWritable(count));
}
}
(4) mapreduce主程式驅動類實現:
package com.empire.hadoop.mr.wccombinerdemo;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* 類 WordcountDriver.java的實現描述:相當於一個yarn叢集的客戶端 需要在此封裝我們的mr程式的相關執行引數,指定jar包
* 最後提交給yarn
*
* @author arron 2018年12月4日 下午9:29:48
*/
public class WordcountDriver {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
//是否執行為本地模式,就是看這個引數值是否為local,預設就是local
/* conf.set("mapreduce.framework.name", "local"); */
//本地模式執行mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
//到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
/* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */
/* conf.set("fs.defaultFS", "file:///"); */
//執行叢集模式,就是把程式提交到yarn中去執行
//要想執行為叢集模式,以下3個引數要指定為叢集上的值
/*
* conf.set("mapreduce.framework.name", "yarn");
* conf.set("yarn.resourcemanager.hostname", "mini1");
* conf.set("fs.defaultFS", "hdfs://mini1:9000/");
*/
Job job = Job.getInstance(conf);
job.setJar("c:/wc.jar");
//指定本程式的jar包所在的本地路徑
/* job.setJarByClass(WordcountDriver.class); */
//指定本業務job要使用的mapper/Reducer業務類
job.setMapperClass(WordcountMapper.class);
job.setReducerClass(WordcountReducer.class);
//指定mapper輸出資料的kv型別
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定最終輸出的資料的kv型別
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//指定需要使用combiner,以及用哪個類作為combiner的邏輯
/* job.setCombinerClass(WordcountCombiner.class); */
job.setCombinerClass(WordcountReducer.class);
//如果不設定InputFormat,它預設用的是TextInputformat.class
job.setInputFormatClass(CombineTextInputFormat.class);
CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
CombineTextInputFormat.setMinInputSplitSize(job, 2097152);
//指定job的輸入原始檔案所在目錄
FileInputFormat.setInputPaths(job, new Path(args[0]));
//指定job的輸出結果所在目錄
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
/* job.submit(); */
boolean res = job.waitForCompletion(true);
System.exit(res ? 0 : 1);
}
}
三、最後總結
雖然combiner元件在shuffle階段使用的話,可以提高程式效率;但是,它有一個使用限制條件,那就是不能影響最後的執行結果;例如:這裡講述一個反例,對多個輸入的數進行求平均數,如果此時使用combiner將不能得到正確的結果。
最後寄語,以上是博主本次文章的全部內容,如果大家覺得博主的文章還不錯,請點贊;如果您對博主其它伺服器大資料技術或者博主本人感興趣,請關注博主部落格,並且歡迎隨時跟博主溝通交流。