1. 程式人生 > >大資料求索(3):實戰MapReduce

大資料求索(3):實戰MapReduce

MapReduce 概述

主要用於離線、海量資料運算

WordCount編寫

下面這張經典圖很好地說明了如何編寫一個WordCount,也清楚說明了MapReduce的流程

mr-1

對於輸入的一個文字(可以存放在HDFS上,可以非常非常大),先對檔案進行拆分,假設這裡一行一份,對於每一行,按空格進行切分,然後給每個單詞賦初值為1,這裡同一個map裡有相同的單詞,也是不會覆蓋的,會保留兩個(word, 1),不同的map之間是沒有依賴關係的,是獨立的、並行的。shuffing階段是為了將相同的聚到一起。map的輸出會作為reduce的輸入,注意,這裡會對map的結果做排序,然後reduce階段進行求和,最終得到統計的詞頻。

MapReduce程式設計模型

框架會把輸入看做由鍵值對組成的集合,key和value都需要被框架所序列化,所以都要實現Writable介面,同時預設會進行排序,所以key還需要實現WritableComparable介面。

將作業拆分成map階段和reduce階段,執行map tasks和reduce tasks,MR的執行流程如下

(input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <lk3, v3>(output)

程式碼

有了上面的分析,可以直接進行編寫了,程式碼如下:

package org.wds.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import
org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * 使用MapReduce開發wordcount應用程式 */ public class WordCount { /** * map: 讀取輸入的檔案 */ public static class MyMapper extends Mapper<LongWritable, Text, Text, LongWritable> { LongWritable one = new LongWritable(1); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { // 接收到的每一行資料 String line = value.toString(); // 按照分隔符拆分 String[] words = line.split(" "); for (String word : words) { // 通過上下文把map的處理結果輸出 context.write(new Text(word), one); } } } /** * Reduce:歸併操作 */ public static class MyReducer extends Reducer<Text, LongWritable, Text, LongWritable> { @Override protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { long sum = 0; for (LongWritable value : values) { // 求key出現的次數總和 sum += value.get(); } // 最終統計結果的輸出 context.write(key, new LongWritable(sum)); } } /** * 定義Driver : 封裝了MapReduce作業的所有資訊 * @param args */ public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { // 設定 Configuration conf = new Configuration(); // 建立job Job job = Job.getInstance(conf, "wordcount"); // 設定job的處理類 job.setJarByClass(WordCount.class); // 設定作業處理的輸入路徑 FileInputFormat.setInputPaths(job, new Path(args[0])); // 設定Map相關引數 job.setMapperClass(MyMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設定reduce相關引數 job.setReducerClass(MyReducer.class); job.setOutputKeyClass(Text.class); job.setMapOutputValueClass(LongWritable.class); // 設定作業處理的輸出路徑 FileOutputFormat.setOutputPath(job, new Path(args[1])); // 退出 System.exit(job.waitForCompletion(true) ? 0 : 1); } }

這裡輸入輸出是自己指定,輸出是到hdfs,然後使用maven把專案打包,上傳到伺服器,使用hadoop jar xx input output就可以運行了。

這裡還有些問題,就是多次運行同一個job會出現路徑已存在的錯誤,解決辦法便是每次執行前檢查路徑是否存在,存在則刪除,操作hdfs很容易實現,在main函式裡裡加入如下程式碼:

	   // 清理已存在的輸出目錄
        Path outpath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(outpath)) {
            fileSystem.delete(outpath, true);
            System.out.println("output file exists, but it has deleted");
        }

MapReduce核心概念

  • Split

    交由MR作業來處理的資料庫,是MR中最小的計算單元,類比於HDFS的blocksize,blocksize是HDFS最小的儲存單元,預設是128M。

    預設情況下,兩者是一一對應的,當然也可以修改(不建議)。

  • InputFormat

    將輸入資料進行分片,將一個檔案拆分為多個split,底層呼叫的是InputSplit[] getSplits(JobConf job, int numSplits)

    比較常用的是TextInputFormat,用於處理文字

  • OutputFormat

    輸出

  • Combiner

    如下一個經典的圖可以很好地解釋

    mr-2

    對map的結果先進行合併,合併之後總共有4條資料,沒合併之前有9條,如果資料很大,這樣就能夠大大減少網路傳輸的消耗,相當於map在本地做了一個reduce。

    使用Combiner也非常簡單,直接在設定裡面加入如下程式碼

     // 通過job設定combiner處理類,其實邏輯上和reduce一模一樣
     // combiner使用場景是有限制的,比如求和、排序,但是求平均是錯誤的
            job.setCombinerClass(MyReducer.class);
    
  • Partitioner

Partitioner決定了MapTask輸出的資料交由哪個ReduceTask進行處理,預設情況下,是由分發的key的hash值對Reduce Task個數進行取模。