1. 程式人生 > >大數據開發 | MapReduce介紹

大數據開發 | MapReduce介紹

file 數據開發 編程模式 silver red 文本文 接口 runner data-

1. MapReduce 介紹

1.1MapReduce的作用

假設有一個計算文件中單詞個數的需求,文件比較多也比較大,在單擊運行的時候機器的內存受限,磁盤受限,運算能力受限,而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度,因此這個工作可能完成不了。針對以上這個案例,MapReduce在這裏能起到什麽作用呢,引入MapReduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理。

可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。而

MapReduce就是這樣一個分布式程序的通用框架。

1.2MapReduce架構圖

技術分享

MapReduce 也采用了 Master/SlaveM/S)架構。它主要由以下幾個組件組成 :ClientJobTrackerTaskTracker Task。下面分別對這幾個組件進行介紹。

1Client

用戶編寫的MapReduce程序通過Client提交到JobTracker同時用戶可通過Client提供的一些接口查看作業運行狀態。在Hadoop內部用作業Job)表示MapReduce程序。一個 MapReduce程序可對應若幹個作業,而每個作業會被分解成若幹個Map/Reduce

任務(Task)。

2JobTracker

JobTracker 主要負責資源監控和作業調度。JobTracker 監控所有 TaskTracker 與作業Job的健康狀況,一旦發現失敗情況後,其會將相應的任務轉移到其他節點;同時,JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閑時,選擇合適的任務使用這些資源。在Hadoop 中,任務調度器是一個可插拔的模塊,用戶可以根據自己的需要設計相應的調度器。

3TaskTracker

   TaskTracker會周期性地通過Heartbeat將本節點上資源的使用情況和任務的運行進度匯報給

JobTracker,同時接收JobTracker發送過來的命令並執行相應的操作(如啟動新任務、殺死 任務等)。TaskTracker 使用“slot”等量劃分本節點上的資源量。 “slot”代表計算資源(CPU、 內存等)。一個 Task 獲取到一個slot 後才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot分為Map slotReduce slot 兩種,分別供Map TaskReduce Task使用。TaskTracker通過slot數目(可配置參數)限定Task的並發度。

4Task

Task 分為 Map Task Reduce Task 兩種,均由TaskTracker啟動。從上一小節中我們知道,HDFS以固定大小的block 為基本單位存儲數據,而對於MapReduce 而言,其處理單位是splitsplit block 的對應關系如下圖所示。split 是一個邏輯概念,它只包含一些元數據信息,比如 數據起始位置、數據長度、數據所在節點等。它的劃分方法完全由用戶自己決定。但需要註意的是,split的多少決定了Map Task的數目,因為每個split會交由一個Map Task處理。

技術分享

Map Task 執行過程如下圖所示。由該圖可知,Map Task 先將對應的split 叠代解析成一 個個 key/value 對,依次調用用戶自定義的map() 函數進行處理,最終將臨時結果存放到本地磁盤上,其中臨時數據被分成若幹個partition(分片),每個partition 將被一個Reduce Task處理。

技術分享

Reduce Task 執行過程如下圖所示。該過程分為三個階段:

從遠程節點上讀取Map Task 中間結果(稱為“Shuffle階段);

按照keykey/value 對進行排序(稱為“Sort階段);

依次讀取 <key, value list>,調用用戶自定義的 reduce() 函數處理,並將最終結果存到HDFS上(稱為“Reduce 階段)。

技術分享

MapReduce是一種並行編程模式,利用這種模式軟件開發者可以輕松地編寫出分布式並行程序。在Hadoop的體系結構中,MapReduce是一個簡單易用的軟件框架,基於它可以將任務分發到由上千臺商用機器組成的集群上,並以一種可靠容錯的方式並行處理大量的數據集,實現Hadoop的並行任務處理功能。MapReduce框架是由一個單獨運行在主節點的JobTrack和運行在每個集群從節點的TaskTrack共同組成的。

主節點負責調度構成一個作業的所有任務,這些任務分布在不同的節點上。主節點監控它們的執行情況,並且重新執行之前失敗的任務;

從節點僅負責由主節點指派的任務。

當一個Job任務被提交時,JobTrack接收到提交作業和其配置信息之後,就會配置信息等發給從節點,同時調度任務並監控TaskTrack的執行。

1.3MapReduce程序運行演示

Hadoop的發布包中內置了一個hadoop-mapreduce-example-2.6.5.jar,這個jar包中有各種MR示例程序,可以通過以下步驟運行:

啟動hdfsyarn,然後在集群中的任意一臺服務器上啟動執行程序(比如運行wordcount):

hadoop jar hadoop-mapreduce-example-2.6.5.jar wordcount /wordcount/data /wordcount/out

2.MapReduce 編程

2.1編程規範

1) 用戶編寫的程序分成三個部分:MapperReducerDriver(提交運行mr程序的客戶端)

2) Mapper的輸入數據是KV對的形式(KV的類型可自定義)

3) Mapper的輸出數據是KV對的形式(KV的類型可自定義)

4) Mapper中的業務邏輯寫在map()方法中

5) map()方法(maptask進程)對每一個<K,V>調用一次

6) Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV

7) Reducer的業務邏輯寫在reduce()方法中

8) Reducetask進程對每一組相同k<k,v>組調用一次reduce()方法

9) 用戶自定義的MapperReducer都要繼承各自的父類

10) 整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象

2.2wordcount 示例編寫

需求:在一堆給定的文本文件中統計輸出每一個單詞出現的總次數

(1)定義一個mapper

//首先要定義四個泛型的類型
//keyin:  LongWritable    valuein: Text
//keyout: Text            valueout:IntWritable

public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    //map方法的生命周期:  框架每傳一行數據就被調用一次
    //key :  這一行的起始點在文件中的偏移量
    //value: 這一行的內容
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //拿到一行數據轉換為string
        String line = value.toString();
        //將這一行切分出各個單詞
        String[] words = line.split(" ");
        //遍歷數組,輸出<單詞,1>
        for(String word:words){
            context.write(new Text(word), new IntWritable(1));
        }
    }
}

(2)定義一個reducer

//生命周期:框架每傳遞進來一個kv 組,reduce方法被調用一次
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        //定義一個計數器
        int count = 0;
        //遍歷這一組kv的所有v,累加到count中
        for(IntWritable value:values){
            count += value.get();
        }
        context.write(key, new IntWritable(count));
    }
}

(3)定義一個主類,用來描述job並提交job

public class WordCountRunner {
    //把業務邏輯相關的信息(哪個是mapper,哪個是reducer,要處理的數據在哪裏,輸出的結果放哪裏……)描述成一個job對象
    //把這個描述好的job提交給集群去運行
    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job wcjob = Job.getInstance(conf);
        //指定我這個job所在的jar包
//        wcjob.setJar("/home/hadoop/wordcount.jar");
        wcjob.setJarByClass(WordCountRunner.class);
        
        wcjob.setMapperClass(WordCountMapper.class);
        wcjob.setReducerClass(WordCountReducer.class);
        //設置我們的業務邏輯Mapper類的輸出key和value的數據類型
        wcjob.setMapOutputKeyClass(Text.class);
        wcjob.setMapOutputValueClass(IntWritable.class);
        //設置我們的業務邏輯Reducer類的輸出key和value的數據類型
        wcjob.setOutputKeyClass(Text.class);
        wcjob.setOutputValueClass(IntWritable.class);
        
        //指定要處理的數據所在的位置
        FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt");
        //指定處理完成之後的結果所保存的位置
        FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/"));
        
        //向yarn集群提交這個job
        boolean res = wcjob.waitForCompletion(true);
        System.exit(res?0:1);
    }

2.3集群運行模式

1) mapreduce程序提交給yarn集群resourcemanager,分發到很多的節點上並發執行

2) 處理的數據和輸出結果應該位於hdfs文件系統

3) 提交集群的實現步驟:

將程序打成JAR包,然後在集群的任意一個節點上用hadoop命令啟動hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath

作者:傑瑞教育
出處:http://www.cnblogs.com/jerehedu/
版權聲明:本文版權歸傑瑞教育技有限公司和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
技術咨詢:技術分享

大數據開發 | MapReduce介紹