1. 程式人生 > >【圖文解析 】MapReduce 示例程式編寫及編碼規範

【圖文解析 】MapReduce 示例程式編寫及編碼規範

上一步,我們查看了 WordCount 這個 MapReduce 程式的原始碼編寫,可以得出幾點結論:

1、 該程式有一個 main 方法,來啟動任務的執行,其中 job 物件就儲存了該程式執行的必要 資訊,比如指定 Mapper 類和 Reducer 類 job.setMapperClass(TokenizerMapper.class); job.setReducerClass(IntSumReducer.class);

2、 該程式中的 TokenizerMapper 類繼承了 Mapper 類

3、 該程式中的 IntSumReducer 類繼承了 Reducer 類    總結:MapReduce 程式的業務編碼分為兩個大部分,一部分配置程式的執行資訊,一部分 編寫該 MapReduce 程式的業務邏輯,並且業務邏輯的 map 階段和 reduce 階段的程式碼分別繼 承 Mapper 類和 Reducer 類 

  那麼現在就來編寫我們自己的 Wordcount 程式 按照上面的編碼規範,主體結構應該是這樣: 

MapReduce 程式編寫規範:

1、使用者編寫的程式分成三個部分:Mapper,Reducer,Driver(提交執行 MR 程式的客戶端)

2、Mapper 的輸入資料是 KV 對的形式(KV 的型別可自定義)

3、Mapper 的輸出資料是 KV 對的形式(KV 的型別可自定義)

4、Mapper 中的業務邏輯寫在 map()方法中

5、map()方法(maptask 程序)對每一個<K,V>呼叫一次

6、Reducer 的輸入資料型別對應 Mapper 的輸出資料型別,也是 KV 對的形式

7、Reducer 的業務邏輯寫在 reduce()方法中

8、Reducetask 程序對每一組相同 k 的<K,V>組呼叫一次 reduce()方法

9、使用者自定義的 Mapper 和 Reducer 都要繼承各自的父類

10、整個程式需要一個 Drvier 來進行提交,提交的是一個描述了各種必要資訊的 job 物件   WordCount 的業務邏輯:

1、 maptask 階段處理每個資料分塊的單詞統計分析,思路是每遇到一個單詞則把其轉換成 一個 key-value 對,比如單詞 hello,就轉換成<’hello’,1>傳送給 reducetask 去彙總

2、 reducetask 階段將接受 maptask 的結果,來做彙總計數    下面是具體實現,首先看 Map:   

    static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{          
    @Override  
    protected void map(LongWritable key, Text value,Context context)    
    throws IOException, InterruptedException {            

    // 計算任務程式碼:切割單詞,輸出每個單詞計 1 的 key-value 對   
    String[] words = value.toString().split(" ");   
    for(String word: words){    
    context.write(new Text(word), new IntWritable(1));   
        }  
    } 
} 

其次看 Reduce: 

 

static class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ 
    @Override  
    protected void reduce(Text key, Iterable<IntWritable> values, Context context)            
        throws IOException, InterruptedException {   
        // 彙總計算程式碼:對每個 key 相同的一組 key-value 做彙總統計   
        int sum = 0;   
        for(IntWritable v: values){    
        sum += v.get();   
}   
        context.write(key, new IntWritable(sum));  
        } 
    }

  在看 Job:     

public static void main(String[] args) throws Exception { 
    // 指定 hdfs 相關的引數  
    Configuration conf = new Configuration();  
    conf.set("fs.defaultFS", "hdfs://hadoop02:9000");          
    System.setProperty("HADOOP_USER_NAME", "hadoop"); 

    // 新建一個 job 任務  
    Job job = Job.getInstance(conf);    

    // 設定 jar 包所在路徑  
    job.setJarByClass(WordCountMR.class);    

    // 指定 mapper 類和 reducer 類  
    job.setMapperClass(WordCountMapper.class);      
    job.setReducerClass(WordCountReducer.class);   
    // 指定 maptask 的輸出型別  
    job.setMapOutputKeyClass(Text.class);  
    job.setMapOutputValueClass(IntWritable.class);    

    // 指定 reducetask 的輸出型別  
    job.setOutputKeyClass(Text.class);  
    job.setOutputValueClass(IntWritable.class);    

    // 指定該 mapreduce 程式資料的輸入和輸出路徑  
    Path inputPath = new Path("/wordcount/input");  
    Path outputPath = new Path("/wordcount/output");  
    FileInputFormat.setInputPaths(job, inputPath);  
    FileOutputFormat.setOutputPath(job, outputPath);    

    // 最後提交任務  
    boolean waitForCompletion = job.waitForCompletion(true);      
    System.exit(waitForCompletion?0:1); }