1. 程式人生 > >大資料(十四):多job串聯與ReduceTask工作機制

大資料(十四):多job串聯與ReduceTask工作機制

一、多job串聯例項(倒索引排序)

1.需求

查詢每個單詞分別在每個檔案中出現的個數

 

預期第一次輸出(表示單詞分別在個個檔案中出現的次數)

apple--a.txt 3

apple--b.txt 1

apple--c.txt 1

grape--a.txt 4

grape--b.txt 3

grape--c.txt 1

pear--a.txt 1

pear--b.txt 2

pear--c.txt 2

預期第二次輸出

apple a.txt 3 b.txt 1 c.txt 1

grape a.txt 4 b.txt 3 c.txt 1

pear a.txt 1 b.txt 2 c.txt 2

 

2.編寫第一個Mapper程式碼

public class OneIndexMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    String name;
    private Text k = new Text();
    private IntWritable v = new IntWritable();
    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        FileSplit split = (FileSplit)context.getInputSplit();
        name = split.getPath().getName();
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取一行資料
        String line = value.toString();
        //切割
        String[] fields = line.split(" ");
        for (String field : fields) {
            //拼接
            k.set(field+"--"+name);
            //輸出
            context.write(k,v);
        }
    }
}

3.編寫第一個Reducer程式碼

public class OneIndexReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //彙總
        int sum = 0;
        for (IntWritable value : values) {
            sum += value.get();
        }
        //輸出
        context.write(key, new IntWritable(sum));
    }
}

4.編寫第一個Driver程式碼

public class OneIndexDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 獲取job物件
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 設定jar包路徑
        job.setJarByClass(OneIndexDriver.class);

        // 3 管理mapper和reducer類
        job.setMapperClass(OneIndexMapper.class);
        job.setReducerClass(OneIndexReducer.class);

        // 4 設定mapper輸出的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 5 設定最終輸出kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        // 6 設定輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

5.編寫第二個Mapper程式碼

public class TwoIndexMapper extends Mapper<LongWritable,Text,Text,Text> {
    Text k = new Text();
    Text v = new Text();
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //獲取一行資料
        String line = value.toString();
        //切割
        String[] fields = line.split("--");
        k.set(fields[0]);
        v.set(fields[1]);
        //輸出
        context.write(k,v);
    }
}

6.編寫第二個Reducer程式碼

public class TwoIndexReducer extends Reducer<Text,Text,Text,Text>{
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        //拼接
        StringBuilder sb =new StringBuilder();
        for (Text value : values) {
            sb.append(value.toString().replace("\t","-->")+"\t");
        }
        //輸出
        context.write(key,new Text(sb.toString()));
    }
}

7.編寫第二個Driver程式碼

public class TwoIndexDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 1 獲取job物件
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        // 2 設定jar包路徑
        job.setJarByClass(TwoIndexDriver.class);

        // 3 管理mapper和reducer類
        job.setMapperClass(TwoIndexMapper.class);
        job.setReducerClass(TwoIndexReducer.class);

        // 4 設定mapper輸出的kv型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);

        // 5 設定最終輸出kv型別
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);

        // 6 設定輸入輸出路徑
        FileInputFormat.setInputPaths(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        // 7 提交
        boolean result = job.waitForCompletion(true);
        System.exit(result ? 0 : 1);
    }
}

8.將第一個的輸出路徑作為第二個的輸入路徑

 

二、ReduceTask工作機制

1.設定ReduceTask並行度

        ReduceTask的並行度同樣影響整個job的執行併發度和執行效率,但與MapTask的併發數由切片數決定不同,ReduceTask數量的是個可以直接手動設定的

//預設值是1,手動設定為4
job.setNumReduceTasks(4);

2.注意點

  1. ReduceTask=0,表示沒有Reduce階段,輸出檔案個數和map個數一致

  2. ReduceTask預設值是1,所以輸出檔案個數是一個

  3. 如果資料分佈不均勻,可能在reduce階段出現數據傾斜

  4. ReduceTask數量並不是任意設定,還要考慮到業務邏輯需求,有些情況下,需要計算全域性彙總結果,就只能設定1個ReduceTask

  5. 具體的ReduceTask個數需要根據叢集效能而定

  6. 如果分割槽數不是一個,但是ReduceTask為1,將不會執行分割槽過程。原始碼中在分割槽步驟之前判斷了ReduceNum個數。

  1. Copy階段:ReduceTask從各中MapTask上遠端拷貝一片資料,並針對某一片資料,如果其帶下超過閾值,則寫到磁碟上,否則放入記憶體。

  2. Merge階段:在遠端拷貝資料的同時,ReduceTask啟動兩個後臺執行緒對記憶體和磁碟上的檔案進行合併,以防止記憶體使用過多或者磁碟上的檔案過多。

  3. Sort階段:按照MapReduce語義,使用者編寫reduce()函式輸入資料是按key進行聚集的一組資料。為了將key相同的資料聚集在一起,Hadoop採用了基於排序的策略。由於各個MapTask已經實現了對自己的處理結果進行了區域性排序,因此,ReduceTask只需要對所有資料進行一次歸併排序即可。

  4. Reduce階段:reduce()函式將計算結果寫到HDFS上。