1. 程式人生 > >大資料教程(8.8)MR內部的shuffle過程詳解&combiner的執行機制及程式碼實現

大資料教程(8.8)MR內部的shuffle過程詳解&combiner的執行機制及程式碼實現

        之前的文章已經簡單介紹過mapreduce的運作流程,不過其內部的shuffle過程並未深入講解;本篇部落格將分享shuffle的全過程。

        一、mapreduce運作流程長卷圖(其中[深]硃紅色代表是可以使用者自定義的部分,當然它們有預設實現)

        二、shuffle過程中的combiner自定義實現

               首先combiner元件有什麼作用呢?它可以減少我們在shuffle歸併排序是的次數、reduce階段處理的資料次數,同時可以有效提供程式的執行效率。

               以下是wordcount使用combiner實現的程式碼

               (1) maper實現:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * WordcountMapper.java的實現描述: KEYIN: 預設情況下,是mr框架所讀到的一行文字的起始偏移量,Long,
 * 但是在hadoop中有自己的更精簡的序列化介面,所以不直接用Long,而用LongWritable
 * VALUEIN:預設情況下,是mr框架所讀到的一行文字的內容,String,同上,用Text
 * KEYOUT:是使用者自定義邏輯處理完成之後輸出資料中的key,在此處是單詞,String,同上,用Text
 * VALUEOUT:是使用者自定義邏輯處理完成之後輸出資料中的value,在此處是單詞次數,Integer,同上,用IntWritable 類
 * 
 * @author arron 2018年12月4日 下午9:30:09
 */

public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {

    /**
     * map階段的業務邏輯就寫在自定義的map()方法中 maptask會對每一行輸入資料呼叫一次我們自定義的map()方法
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        //將maptask傳給我們的文字內容先轉換成String
        String line = value.toString();
        //根據空格將這一行切分成單詞
        String[] words = line.split(" ");

        //將單詞輸出為<單詞,1>
        for (String word : words) {
            //將單詞作為key,將次數1作為value,以便於後續的資料分發,可以根據單詞分發,以便於相同單詞會到相同的reduce task
            context.write(new Text(word), new IntWritable(1));
        }

    }

}

               (2) reducer實現實現:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 類 WordcountReducer.java的實現描述:KEYIN, VALUEIN 對應 mapper輸出的KEYOUT,VALUEOUT型別對應
 * KEYOUT, VALUEOUT 是自定義reduce邏輯處理結果的輸出資料型別 KEYOUT是單詞 VLAUEOUT是總次數
 * 
 * @author arron 2018年12月4日 下午9:51:15
 */
public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    /**
     * <angelababy,1><angelababy,1><angelababy,1><angelababy,1><angelababy,1>
     * <hello,1><hello,1><hello,1><hello,1><hello,1><hello,1>
     * <banana,1><banana,1><banana,1><banana,1><banana,1><banana,1>
     * 入參key,是一組相同單詞kv對的key
     */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        /*
         * Iterator<IntWritable> iterator = values.iterator();
         * while(iterator.hasNext()){ count += iterator.next().get(); }
         */

        for (IntWritable value : values) {

            count += value.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (3) combiner實現實現:

package com.empire.hadoop.mr.wccombinerdemo;

import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

/**
 * 類 WordcountCombiner.java的實現描述:輸如為map的輸出
 * 
 * @author arron 2018年12月4日 下午9:29:25
 */
public class WordcountCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)
            throws IOException, InterruptedException {

        int count = 0;
        for (IntWritable v : values) {

            count += v.get();
        }

        context.write(key, new IntWritable(count));

    }

}

               (4) mapreduce主程式驅動類實現:

package com.empire.hadoop.mr.wccombinerdemo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

/**
 * 類 WordcountDriver.java的實現描述:相當於一個yarn叢集的客戶端 需要在此封裝我們的mr程式的相關執行引數,指定jar包
 * 最後提交給yarn
 * 
 * @author arron 2018年12月4日 下午9:29:48
 */
public class WordcountDriver {

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();

        //是否執行為本地模式,就是看這個引數值是否為local,預設就是local
        /* conf.set("mapreduce.framework.name", "local"); */

        //本地模式執行mr程式時,輸入輸出的資料可以在本地,也可以在hdfs上
        //到底在哪裡,就看以下兩行配置你用哪行,預設就是file:///
        /* conf.set("fs.defaultFS", "hdfs://mini1:9000/"); */
        /* conf.set("fs.defaultFS", "file:///"); */

        //執行叢集模式,就是把程式提交到yarn中去執行
        //要想執行為叢集模式,以下3個引數要指定為叢集上的值
        /*
         * conf.set("mapreduce.framework.name", "yarn");
         * conf.set("yarn.resourcemanager.hostname", "mini1");
         * conf.set("fs.defaultFS", "hdfs://mini1:9000/");
         */
        Job job = Job.getInstance(conf);

        job.setJar("c:/wc.jar");
        //指定本程式的jar包所在的本地路徑
        /* job.setJarByClass(WordcountDriver.class); */

        //指定本業務job要使用的mapper/Reducer業務類
        job.setMapperClass(WordcountMapper.class);
        job.setReducerClass(WordcountReducer.class);

        //指定mapper輸出資料的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //指定最終輸出的資料的kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        //指定需要使用combiner,以及用哪個類作為combiner的邏輯
        /* job.setCombinerClass(WordcountCombiner.class); */
        job.setCombinerClass(WordcountReducer.class);

        //如果不設定InputFormat,它預設用的是TextInputformat.class
        job.setInputFormatClass(CombineTextInputFormat.class);
        CombineTextInputFormat.setMaxInputSplitSize(job, 4194304);
        CombineTextInputFormat.setMinInputSplitSize(job, 2097152);

        //指定job的輸入原始檔案所在目錄
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        //指定job的輸出結果所在目錄
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        //將job中配置的相關引數,以及job所用的java類所在的jar包,提交給yarn去執行
        /* job.submit(); */
        boolean res = job.waitForCompletion(true);
        System.exit(res ? 0 : 1);

    }

}

        三、最後總結

               雖然combiner元件在shuffle階段使用的話,可以提高程式效率;但是,它有一個使用限制條件,那就是不能影響最後的執行結果;例如:這裡講述一個反例,對多個輸入的數進行求平均數,如果此時使用combiner將不能得到正確的結果。       

        最後寄語,以上是博主本次文章的全部內容,如果大家覺得博主的文章還不錯,請點贊;如果您對博主其它伺服器大資料技術或者博主本人感興趣,請關注博主部落格,並且歡迎隨時跟博主溝通交流。