1. 程式人生 > >從 WordCount 到 MapReduce 計算模型

從 WordCount 到 MapReduce 計算模型

概述

雖然現在都在說大記憶體時代,不過記憶體的發展怎麼也跟不上資料的步伐吧。所以,我們就要想辦法減小資料量。這裡說的減小可不是真的減小資料量,而是讓資料分散開來。分開儲存、分開計算。這就是 MapReduce 分散式的核心。

版權說明

目錄

MapReduce 簡介

要了解 MapReduce,首先要了解 MapReduce 的載體是什麼。在 Hadoop 中,用於執行 MapReduce 任務的機器有兩個角色:一個是 JobTracker,另一個是 TaskTracker。JobTracker 是用於管理和排程工作的,TaskTracker 是用於執行工作的。一個 Hadoop 叢集中只有一臺 JobTracker(當然在 Hadoop 2.x 中,一個 Hadoop 叢集中可能有多個 JobTracker)。

MapReduce 原理

MapReduce 模型的精髓在於它的演算法思想——分治。對於分治的過程可以參見我之前的一篇部落格《大資料演算法:對5億資料進行排序》。還有就是可以去學習一下排序演算法中的歸併排序,在這個排序演算法中就是基於分治思想的。
迴歸正題,在 MapReduce 模型中,可以把分治的這一概念表現得淋漓盡致。在處理大量資料的時候(比如說 1 TB,你別說沒有這麼多的資料,大公司這點資料也不算啥的),如果只是單純地依賴我們的硬體,就顯得有些力不從心了。首先我們的記憶體沒有那麼大,如放在磁碟上處理,那麼過多的 IO 操作無疑是一個死穴。聰明的 Google 工程師總是給我們這些渣渣帶來驚喜,他們想把了把這些資料分散到許多機器上,在這些機器上完成一些初步的計算,再經過一系列的彙總,最後在我們的機器上(Master/Namenode)統計結果。
要知道我們不可能把我們的資料分散到隨意的 N 臺機器上。那麼我們就必須讓這些機器之間建立一種可靠的關聯,這樣的關聯形成了一個計算機叢集。這樣我們的資料就可以分發到叢集中的各個計算機上了。在 Hadoop 裡這一操作可以通過 -put

這一指令實現,關於這一點在下面的操作過程中也有體現。
當資料被上傳到 Hadoop 的 HDFS 檔案系統上之後,就可以通過 MapReduce 模型中的 Mapper 先將資料讀進記憶體,過程像下面這樣:
這裡寫圖片描述

經過 Mapper 的處理,資料會變成這樣
這裡寫圖片描述

好了,到了這裡,Map 的過程就已經結束了。接下來就是 Reduce 的過程了。

這裡寫圖片描述

可以看到這裡有一個 conbin 的過程,這個過程,也可以沒有的。而有的時候是一定不能有的,在後面我們可以會單獨來說說這裡的 conbin,不過不是本文的內容,就不詳述了。
這樣整個 MapReduce 過程就已經 over 了,下面看看具體的實現及測試結果吧。

WordCount 程式

需求分析

  1. 現在有大量的檔案
  2. 每個檔案又有大量的單詞
  3. 要求統計每個單詞的詞頻

邏輯實現

Mapper

public static class CoreMapper extends Mapper<Object, Text, Text, IntWritable> {

        private static final IntWritable one = new IntWritable(1);
        private static Text label = new Text();

        @Override
        protected void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while(tokenizer.hasMoreTokens()) {
                label.set(tokenizer.nextToken());
                context.write(label, one);
            }
        }
    }

Reducer

public static class CoreReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable count = new IntWritable();

        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                Reducer<Text, IntWritable, Text, IntWritable>.Context context)
                throws IOException, InterruptedException {
            if (null == values) {
                return;
            }

            int sum = 0;
            for (IntWritable intWritable : values) {
                sum += intWritable.get();
            }
            count.set(sum);

            context.write(key, count);
        }
    }

Client

public class ComputerClient extends Configuration implements Tool {

    public static void main(String[] args) {
        ComputerClient client = new ComputerClient();

        args = new String[] {
                AppConstant.INPUT,
                AppConstant.OUTPUT
        };

        try {
            ToolRunner.run(client, args);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public Configuration getConf() {
        return this;
    }

    @Override
    public void setConf(Configuration arg0) {
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = new Job(getConf(), "ComputerClient-job");
        job.setJarByClass(CoreComputer.class);

        job.setMapperClass(CoreComputer.CoreMapper.class);
        job.setCombinerClass(CoreComputer.CoreReducer.class);
        job.setReducerClass(CoreComputer.CoreReducer.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        return job.waitForCompletion(true) ? 0 : 1;
    }
}

本地執行

關於本地執行沒什麼好說的,就是在 Eclipse 裡配置好執行引數或是直接在程式碼裡指定輸入輸出路徑。然後 Run As 一個 Hadoop 程式即可。

分散式執行

在分散式執行 MapReduce 的過程中,主要有以下幾個步驟:
1. 打包
2. 上傳源資料
3. 分散式執行

打包

在打包的過程中,可以使用命令列打包,也可以使用 Eclipse 自帶的 Export。在 Eclipse 的打包匯出過程中,與打包匯出一個 Java 的 jar 過程是一樣的。這裡就不多說了。假設我們打成的 jar 包為: job.jar

上傳源資料

上傳源資料是指將本地的資料上傳到 HDFS 檔案系統上。
在上傳源資料之前我們需要在 HDFS 上新建你需要上傳的目標路徑,然後使用下面的這條指令即可完成資料的上傳。

$ hadoop fs -mkdir <hdfs_input_path>
$ hadoop fs -put <local_path> <hdfs_input_path>

如果這裡之前你不進行建立目錄,上傳過程會因為找不到目錄而出現異常情況。
資料上傳完成後,這些資料會分佈在你整個叢集的 DataNode 上,而不只是在你的本地機器上了。

分散式執行

等上面的所有事情已經就緒,那麼就可以使用下面的 hadoop 指令執行我們的 hadoop 程式。

$ hadoop jar job.jar <hdfs_input_path> <hdfs_output_path>

結果視窗

開啟瀏覽器
這裡是程式中執行的過程中,進度的變化情況
這裡寫圖片描述
下面是程式執行完成時的網頁截圖

這裡寫圖片描述

這裡寫圖片描述

Ref

  • 《Hadoop 實戰》