1. 程式人生 > >MapReduce 程式設計模型 & WordCount 示例

MapReduce 程式設計模型 & WordCount 示例

 

學習大資料接觸到的第一個程式設計思想 MapReduce。

 

前言

之前在學習大資料的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大資料或者說正在學習大資料的朋友。如果你看到裡面的東西,讓你知道了它,這也是一種進步嘛。說不定就開啟了你的另一扇大門呢?

 

先來看一個問題

在講 MapReduce 之前,我們先來看一個問題。我們都知道,在大資料場景中,最先讓人瞭解到的就是資料量大。當資料量大了以後,我們在處理一些問題的時候,可能就沒辦法按照以前我們傳統的方式去解決問題。

我們以一個簡單的單詞計數來看這個問題。

比如現在我們有一個檔案,就10M,裡面存放的是一篇英文文件,我們現在的需求就是計算單詞出現的次數。

按照我們以前寫 Java 程式碼的套路來做,大概就是讀取檔案,把資料載入到記憶體,然後new 一個map來存最後的結果。key 就是單詞,value 就是單詞出現的次數。

 然後從檔案中讀取一行資料,然後對這行資料按空格進行切割,然後對切割後的一個一個的單詞進行處理,看map 中是否存在,存在就 value + 1,不存在就設定 value 為 1 。

 然後再讀取一行資料重複上面的操作,直到結束。很簡單吧。

是的,沒問題,剛才檔案是 10M,處理完成秒秒鐘的事情,但是現在我的檔案是 2T 的大小,看清楚呃,是兩個 T 的檔案需要處理,那你現在要怎麼做?還去載入到記憶體麼?

想想你公司的機器配置,記憶體多大,8G,16G,32G ...,頂起天 128G 吧。先不說多大,再想想現在記憶體價格是多少,128G 的記憶體得花多少錢。很顯然,現在這麼玩兒,玩不了吧。

但是,現在一般你公司的機器都還是有不少臺吧。那麼如果說我們現在把這些機器組成一個 N 節點的叢集,然後把這 2T 的檔案切分成很多個小檔案,然後丟到這些機器上面去計算執行統計,最後再進行一個彙總,是不是就解決了上面的記憶體不足的問題。

 

MapReduce 思想

MapReduce 是一種程式設計模型,用於大規模資料集(大於1TB)的並行運算,源於 Google 一篇論文,它充分借鑑了 “分而治之” 的思想,將一個數據處理過程拆分為主要的Map(對映)與Reduce(化簡)兩步。

對比上面的例子來說,Map 階段就是每個機器處理切好的資料片的階段,Reduce 階段則是最後統計彙總的階段。

那麼,針對前面說的例子大概可以用下面這個圖來描述它:

 

簡單說一下上面的思路:

第一步:把兩個T 的檔案分成若干個檔案塊(block)分散存在整個叢集上,比如128M 一個。

第二步:在每臺機器上執行一個map task 任務,分別對自己機器上的檔案進行統計:

1.先把資料載入進記憶體,然後一行一行的對資料進行讀取,按照空格來進行切割。

2.用一個 HashMap 來儲存資料,內容為 <單詞,數量>

3.當自己本地的資料處理完成以後,將資料進行輸出準備

4.輸出資料之前,先把HashMap 按照首字母範圍分成 3 個HashMap5.將3個 HashMap 分別傳送給 3個 Reduce task 進行處理,分發的時候,同一段單詞的資料,就會進入同一個 Reduce task 進行處理,保證資料統計的完整性。

第三步: Reduce task 把收到的資料進行彙總,然後輸出到 hdfs 檔案系統程序儲存。

上面的過程可能遇到的問題

上面我們只是關心了我們業務邏輯的實現,其實系統一旦做成分散式以後,會面臨非常多的複雜問題,比如:

•你的 Map task 如何進行任務分配?

•你的 Reduce task 如何分配要處理的資料任務?

•Map task 和 Reduce task 之間如何進行銜接,什麼時候去啟動Reduce Task 呀?

•如果 Map task 執行失敗了,怎麼處理?

•Map task 還要去維護自己要傳送的資料分割槽,是不是也太麻煩了。

•等等等等等

 

為什麼要用 MapReduce

可見在程式由單機版擴成分散式時,會引入大量的複雜工作。為了提高開發效率,可以將分散式程式中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。

而 MapReduce 就是這樣一個分散式程式的通用框架。

 

WordCount 示例

用一個程式碼示例來演示,它需要3個東西,一個是map task ,一個是 reduce task ,還有就是啟動類,不然怎麼關聯他們的關係呢。

首先是 map task :

package com.zhouq.mr;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/** * KEYIN 預設情況下,是MR 框架中讀取到的一行文字的起始偏移量,long 型別 * 在hadoop 中有自己更精簡的序列化介面,我們不直接用Long ,而是用 LongWritable * VALUEIN : 預設情況下,是MR 中讀取到的一行文字內容,String ,也有自己的型別 Text 型別 * <p> * KEYOUT : 是使用者自定義的邏輯處理完成後的自定義輸出資料的key ,我們這裡是單詞,型別為string 同上,Text * <p> * VALUEOUT: 是使用者自定義的邏輯處理完成後的自定義輸出value 型別,我們這裡是單詞數量Integer,同上,Integer 也有自己的型別 IntWritable * <p> */public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {    /**     * map 階段的業務邏輯就寫在map 方法內     * maptask 會對每一行輸入資料 就呼叫一次我們自定義的map 方法。     */    @Override    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {        //拿到輸入的這行資料        String line = value.toString();        //根據空格進行分割得到這行的單詞        String[] words = line.split(" ");        //將單詞輸出為 <word,1>        for (String word : words) {            //將單詞作為key ,將次數 做為value輸出,            // 這樣也利於後面的資料分發,可以根據單詞進行分發,            // 以便於相同的單詞落到相同的reduce task 上,方便統計            context.write(new Text(word), new IntWritable(1));        }    }}

接下來是 reduce task 邏輯:

/** * KEYIN VALUEIN 對於map 階段輸出的KEYOUT VALUEOUT * <p> * KEYOUT :是自定義 reduce 邏輯處理結果的key * VALUEOUT : 是自定義reduce 邏輯處理結果的 value */public class WordcountReduce extends Reducer<Text, IntWritable, Text, IntWritable> {    /**     * <zhouq,1>,<zhouq,1>,<zhouq,2> ......     * 入參key 是一組單詞的kv對 的 key     */    @Override    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {        //拿到當前傳送進來的 單詞//        String word = key.toString();        //        int count = 0;        for (IntWritable value : values) {            count += value.get();        }        //這裡的key  就是單詞        context.write(key, new IntWritable(count));    }}

 

最後是啟動類:

 

/** * wc 啟動類 */public class WordCountDriver {    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {        Configuration conf = new Configuration();        // mapreduce.framework.name 配置成 local 就是本地執行模式,預設就是local        // 所謂的叢集執行模式 yarn ,就是提交程式到yarn 上. 要想叢集執行必須指定下面三個配置.//        conf.set("mapreduce.framework.name", "yarn");//        conf.set("yarn.resoucemanager.hostname", "mini1");        //conf.set("fs.defaultFS","com.zhouq.hdfs://mini1:9000/");        Job job = Job.getInstance(conf);        //指定本程式的jar 包 所在的本地路徑        job.setJarByClass(WordCountDriver.class);        //指定本次業務的mepper 和 reduce 業務類        job.setMapperClass(WordCountMapper.class);        job.setReducerClass(WordcountReduce.class);        //指定mapper 輸出的 key  value 型別        job.setMapOutputKeyClass(Text.class);        job.setMapOutputValueClass(IntWritable.class);        //指定 最終輸出的 kv  型別        job.setOutputKeyClass(Text.class);        job.setOutputValueClass(IntWritable.class);        //指定job的輸入原始檔案所在目錄        FileInputFormat.setInputPaths(job,new Path(args[0]));        //指定job 輸出的檔案目錄        FileOutputFormat.setOutputPath(job,new Path(args[1]));        boolean waitForCompletion = job.waitForCompletion(true);        System.exit(waitForCompletion ? 0 : 1);    }}

 

配置啟動類引數:填寫輸入目錄和輸出目錄,注意輸出目錄不能存在,不然會執行失敗的。

 

執行我們就用編輯器執行,用本地模式,不提交到hadoop 叢集上,執行完成後,去到輸出目錄下可以看到這些檔案: 

 

然後輸出一下 part-r-00000 這個檔案:

程式碼地址:https://github.com/heyxyw/bigdata/blob/master/bigdatastudy/mapreduce/src/main/java/com/zhouq/mr/WordCountDriver.java

 

最後

希望對你有幫助。後面將會去講 MapReduce 是如何去執行的。

 

作者·往期內容:

記一次阿里巴巴一面的經歷

 


 

作者介紹:喬二爺,在成都喬二爺這個名字是之前身邊的同事給取的,也不知道為啥。也習慣了他們這樣叫我。

一直待在相對傳統一點的企業,有四年半的 Java 開發經驗,會點大資料的內容,也跟客戶打過一年的交道,還帶過 10個月 10人+的技術團隊,有一定的協調組織能力,能夠理解 boss 的工作內容,也能很好的配合別人做事。

 


 

Java 極客技術公眾號,是由一群熱愛 Java 開發的技術人組建成立,專注分享原創、高質量的 Java 文章。如果您覺得我們的文章還不錯,請幫忙讚賞、在看、轉發支援,鼓勵我們分享出更好的文章。

關注公眾號,大家可以在公眾號後臺回覆“部落格園”,免費獲得作者 Java 知識體系/面試必看資料。

相關推薦

MapReduce 程式設計模型 &amp; WordCount 示例

  學習大資料接觸到的第一個程式設計思想 MapReduce。   前言 之前在學習大資料的時候,很多東西很零散的做了一些筆記,但是都沒有好好去整理它們,這篇文章也是對之前的筆記的整理,或者叫輸出吧。一來是加深自己的理解,二來是希望這些東西能幫助想要學習大資料或者說正在學習大

Python實現一個最簡單的MapReduce程式設計模型WordCount

MapReduce程式設計模型: Map:對映過程 Reduce:合併過程 import operator from functools import reduce # 需要處理的資料 lst = [ "Tom", "Jack",

hadoop學習(7)—— 使用yarn執行mapreduce一個簡單的wordcount示例

1.hdfs檔案系統目錄要求(建議) /user /{username} --使用者名稱 /mr

MapReduce程式設計例項之WordCount

1.MapReduce計算框架 2.例項WordCount package org.apache.hadoop.examples; import java.io.IOException; import java.util.StringTokeni

MapReduce程式設計模型的認識

                                                                             MapReduce程式設計模型的認識 對於這個

Hadoop之MapReduce程式設計模型

一、MapReduce程式設計模型         MapReduce將作業的整個執行過程分為兩個階段:Map階段和Reduce階段         Map階段由一定數量的Map Task組成             輸入資料格式解析:InputFormat        

Mapreduce程式設計1之WordCount

Mapreduce是hadoop的計算框架,對資料的處理操作都要在這裡程式設計來實現功能。 這是我學習的第一個程式,也算是入門程式,相當於其他語言的helloworld,雖然還有很多不懂的地方,但相信通過以後的學習能夠懂更多東西。 WordCount 實現

Storm WordCount程式設計模型,併發度&amp;分組策略

程式設計模型: Spout /** * @program: WordCountSpout.class * @description: 傳輸資料到bolt,有一個抽象類BaseRichSpout,BaseRichBolt,一個介面IRichSpout,IRichBolt, * 常

Hadoop MapReduce 官方教程 -- WordCount示例

get pre red oop hadoop apache tor ria pac Hadoop MapReduce 官方教程 -- WordCount示例: http://hadoop.apache.org/docs/r1.0.4/cn/mapred_tutorial.h

MapReduce 程式執行演示(示例PI程式 wordcount程式)

你說的9000埠應該指的是fs.default.name或fs.defaultFS(新版本)這一配置屬性吧,這個屬性是描述叢集中NameNode結點的URI(包括協議、主機名稱、埠號) 50070其實是在hdfs-site.xml裡面的配置引數dfs.namenode.http-address,

Hadoop學習筆記—4.初識MapReduce 一、神馬是高大上的MapReduce   MapReduce是Google的一項重要技術,它首先是一個程式設計模型,用以進行大資料量的計算。對於大資料

Hadoop學習筆記—4.初識MapReduce 一、神馬是高大上的MapReduce   MapReduce是Google的一項重要技術,它首先是一個程式設計模型,用以進行大資料量的計算。對於大資料量的計算,通常採用的處理手法就是平行計算。但對許多開發

Cuda程式設計系列-Cuda程式設計基本概念&amp;程式設計模型

原文連結 系列文章: Cuda程式設計101:Cuda程式設計的基本概念及程式設計模型 Cuda程式設計102:Cuda程式效能相關話題 Cuda程式設計103:Cuda多卡程式設計 Cuda tips: nvcc的-code、-arch、-gencode選項 基本想法 在介紹編

MapReduce程式設計WordCount

課程回顧:     wordcount程式:     單機版:         統計的6個檔案         定義一個方法------讀取每一個小檔案進行統計         這個方法呼叫了6次         定義了一個最終統計的方法         這個方法呼叫了1次

hadoop學習(六)WordCount示例深度學習MapReduce過程(1)

        花了整整一個下午(6個多小時),整理總結,也算是對這方面有一個深度的瞭解。日後可以回頭多看看。         我們都安裝完Hadoop之後,按照一些案例先要跑一個WourdCount程式,來測試Hadoop安裝是否成功。在終端中用命令建立一個資料夾,簡單的

MapReduce程式設計 一步步地教你開啟 第一個程式wordcount

例項描述   計算出檔案中每個單詞的頻數,要求輸出結果按照單詞的字母順序進行排序,按照key-value格式輸出結果。 比如輸入檔案為: hello world hello hadoop hello mapreduce

雲星資料---Apache Flink實戰系列(精品版)】:Flink流處理API詳解與程式設計實戰002-Flink基於流的wordcount示例002

三、基於socket的wordcount 1.傳送資料 1.傳送資料命令 nc -lk 9999 2.傳送資料內容 good good study day day

MapReduce-大規模資料集分散式並行運算程式設計模型

    本文轉載自CSDN部落格,純為技術資料備份!     MapReduce的名字源於函數語言程式設計模型中的兩項核心操作:Map和Reduce。也許熟悉Functional Programming(FP)的人見到這兩個詞會倍感親切。因為Map和Reduce這兩個術語源自Lisp語言和函數語言程式設計。

WordCountMapReduce 計算模型

概述 雖然現在都在說大記憶體時代,不過記憶體的發展怎麼也跟不上資料的步伐吧。所以,我們就要想辦法減小資料量。這裡說的減小可不是真的減小資料量,而是讓資料分散開來。分開儲存、分開計算。這就是 MapReduce 分散式的核心。 版權說明 目錄

mapreduce程式設計模型

mapreduce程式設計模型核心為將資料運算流程分為兩個階段:    拆分,讀取原始資料,形成key-value資料(map方法);    聚合,將相同key的資料聚合到一組(reduce方法)。maptask:    讀資料:讀取源資料,maptask獲取分片資料資訊(型

Hadoop學習筆記—4.初識MapReduce 一、神馬是高大上的MapReduce   MapReduce是Google的一項重要技術,它首先是一個程式設計模型,用以進行大資料量的計算。對於大資料

一、神馬是高大上的MapReduce   MapReduce是Google的一項重要技術,它首先是一個程式設計模型,用以進行大資料量的計算。對於大資料量的計算,通常採用的處理手法就是平行計算。但對許多開發者來說,自己完完全全實現一個平行計算程式難度太大,而MapReduce就是一種簡化平行計算的程式設計模