1. 程式人生 > >MapReduce根據WordCount分析map和Reducer原理

MapReduce根據WordCount分析map和Reducer原理

Mapper 階段

package com.zyd.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
 * 輸入的kye LongWritable 行號
 * 輸入的value 序列化的String型別 Text 一行的內容
 * 輸出的key Text 單詞
 * 輸出的value 數字 IntWritable型別 單詞個數
 */
public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    //避免迴圈時不斷建立物件
    Text k = new Text();
    IntWritable v = new IntWritable(1);
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 1. 將一行內容轉換成String,因為傳進來的是Text型別
        String line = value.toString();
        // 2. 按照空格進行切割成一個個的單詞
        String[] words = line.split(" ");
        //3. 迴圈寫出到下一階段 形式是 <word,1>
        for (String word : words){
            //輸出時型別不匹配 但是避免每一次建立一個物件對於記憶體的損耗在方法外進行建立
            k.set(word);
            context.write(k,v);
        }
    }
}

Reducer階段

package com.zyd.wc;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * Reducer的輸入數map的輸出,所以序列化的型別要匹配
 */
public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    @Override
    /**
     * 相同的key進行計算
     * 相同的key<word,1> 有多個,需要迭代器
     * context:輸出
     */
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //統計單詞總個數
        int sum =0;

        for (IntWritable count:values){
            sum +=count.get();
        }


        //輸出單詞總個數
        context.write(key,new IntWritable(sum));
    }
}

驅動類

package com.zyd.wc;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCountRunner {
    public static void main (String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            //1. 獲取配置資訊 或者job物件例項
        Configuration conf= new Configuration();
        Job job = Job.getInstance(conf);
        //6. 指定本程式的jar包所在本地路徑
        //job.setJar("/home/wc.jar"); 由於地址變化,不合適
        //底層框架實現,自動找jar的位置
        job.setJarByClass(WordCountRunner.class);

        //2. 指定本業務job所使用的mapper和reducer業務類
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        //3. 指定mapper輸出資料k,v型別
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //4.指定最終輸出資料的k,v型別
        job.setOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        //5. 指定job的輸入原始檔案所在目錄
        FileInputFormat.setInputPaths(job,new Path(args[0]));
        FileOutputFormat.setOutputPath(job,new Path(args[1]));

        //7. 將job中配置的相關引數,以及job中所用的java類所在的jar包,
        //提交給yarn去執行
        boolean result = job.waitForCompletion(true);
        System.out.println(result?0:1);
    }
}