大數據開發 | MapReduce介紹
1. MapReduce 介紹 |
1.1MapReduce的作用
假設有一個計算文件中單詞個數的需求,文件比較多也比較大,在單擊運行的時候機器的內存受限,磁盤受限,運算能力受限,而一旦將單機版程序擴展到集群來分布式運行,將極大增加程序的復雜度和開發難度,因此這個工作可能完成不了。針對以上這個案例,MapReduce在這裏能起到什麽作用呢,引入MapReduce框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分布式計算中的復雜性交由框架來處理。
可見在程序由單機版擴成分布式時,會引入大量的復雜工作。為了提高開發效率,可以將分布式程序中的公共功能封裝成框架,讓開發人員可以將精力集中於業務邏輯。而
1.2MapReduce架構圖
MapReduce 也采用了 Master/Slave(M/S)架構。它主要由以下幾個組件組成 :Client、JobTracker、 TaskTracker 和 Task。下面分別對這幾個組件進行介紹。
(1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端;同時用戶可通過Client提供的一些接口查看作業運行狀態。在Hadoop內部用“作業” (Job)表示MapReduce程序。一個 MapReduce程序可對應若幹個作業,而每個作業會被分解成若幹個Map/Reduce
(2)JobTracker
JobTracker 主要負責資源監控和作業調度。JobTracker 監控所有 TaskTracker 與作業Job的健康狀況,一旦發現失敗情況後,其會將相應的任務轉移到其他節點;同時,JobTracker 會跟蹤任務的執行進度、資源使用量等信息,並將這些信息告訴任務調度器,而調度器會在資源出現空閑時,選擇合適的任務使用這些資源。在Hadoop 中,任務調度器是一個可插拔的模塊,用戶可以根據自己的需要設計相應的調度器。
(3)TaskTracker
TaskTracker會周期性地通過Heartbeat將本節點上資源的使用情況和任務的運行進度匯報給
(4)Task
Task 分為 Map Task 和 Reduce Task 兩種,均由TaskTracker啟動。從上一小節中我們知道,HDFS以固定大小的block 為基本單位存儲數據,而對於MapReduce 而言,其處理單位是split。 split 與 block 的對應關系如下圖所示。split 是一個邏輯概念,它只包含一些元數據信息,比如 數據起始位置、數據長度、數據所在節點等。它的劃分方法完全由用戶自己決定。但需要註意的是,split的多少決定了Map Task的數目,因為每個split會交由一個Map Task處理。
Map Task 執行過程如下圖所示。由該圖可知,Map Task 先將對應的split 叠代解析成一 個個 key/value 對,依次調用用戶自定義的map() 函數進行處理,最終將臨時結果存放到本地磁盤上,其中臨時數據被分成若幹個partition(分片),每個partition 將被一個Reduce Task處理。
Reduce Task 執行過程如下圖所示。該過程分為三個階段:
①從遠程節點上讀取Map Task 中間結果(稱為“Shuffle階段”);
②按照key對key/value 對進行排序(稱為“Sort階段”);
③依次讀取 <key, value list>,調用用戶自定義的 reduce() 函數處理,並將最終結果存到HDFS上(稱為“Reduce 階段”)。
MapReduce是一種並行編程模式,利用這種模式軟件開發者可以輕松地編寫出分布式並行程序。在Hadoop的體系結構中,MapReduce是一個簡單易用的軟件框架,基於它可以將任務分發到由上千臺商用機器組成的集群上,並以一種可靠容錯的方式並行處理大量的數據集,實現Hadoop的並行任務處理功能。MapReduce框架是由一個單獨運行在主節點的JobTrack和運行在每個集群從節點的TaskTrack共同組成的。
主節點負責調度構成一個作業的所有任務,這些任務分布在不同的節點上。主節點監控它們的執行情況,並且重新執行之前失敗的任務;
從節點僅負責由主節點指派的任務。
當一個Job任務被提交時,JobTrack接收到提交作業和其配置信息之後,就會配置信息等發給從節點,同時調度任務並監控TaskTrack的執行。
1.3MapReduce程序運行演示
Hadoop的發布包中內置了一個hadoop-mapreduce-example-2.6.5.jar,這個jar包中有各種MR示例程序,可以通過以下步驟運行:
啟動hdfs,yarn,然後在集群中的任意一臺服務器上啟動執行程序(比如運行wordcount):
hadoop jar hadoop-mapreduce-example-2.6.5.jar wordcount /wordcount/data /wordcount/out
2.MapReduce 編程 |
2.1編程規範
1) 用戶編寫的程序分成三個部分:Mapper,Reducer,Driver(提交運行mr程序的客戶端)
2) Mapper的輸入數據是KV對的形式(KV的類型可自定義)
3) Mapper的輸出數據是KV對的形式(KV的類型可自定義)
4) Mapper中的業務邏輯寫在map()方法中
5) map()方法(maptask進程)對每一個<K,V>調用一次
6) Reducer的輸入數據類型對應Mapper的輸出數據類型,也是KV
7) Reducer的業務邏輯寫在reduce()方法中
8) Reducetask進程對每一組相同k的<k,v>組調用一次reduce()方法
9) 用戶自定義的Mapper和Reducer都要繼承各自的父類
10) 整個程序需要一個Drvier來進行提交,提交的是一個描述了各種必要信息的job對象
2.2wordcount 示例編寫
需求:在一堆給定的文本文件中統計輸出每一個單詞出現的總次數
(1)定義一個mapper類
//首先要定義四個泛型的類型 //keyin: LongWritable valuein: Text //keyout: Text valueout:IntWritable public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ //map方法的生命周期: 框架每傳一行數據就被調用一次 //key : 這一行的起始點在文件中的偏移量 //value: 這一行的內容 @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //拿到一行數據轉換為string String line = value.toString(); //將這一行切分出各個單詞 String[] words = line.split(" "); //遍歷數組,輸出<單詞,1> for(String word:words){ context.write(new Text(word), new IntWritable(1)); } } }
(2)定義一個reducer類
//生命周期:框架每傳遞進來一個kv 組,reduce方法被調用一次 @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { //定義一個計數器 int count = 0; //遍歷這一組kv的所有v,累加到count中 for(IntWritable value:values){ count += value.get(); } context.write(key, new IntWritable(count)); } }
(3)定義一個主類,用來描述job並提交job
public class WordCountRunner { //把業務邏輯相關的信息(哪個是mapper,哪個是reducer,要處理的數據在哪裏,輸出的結果放哪裏……)描述成一個job對象 //把這個描述好的job提交給集群去運行 public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job wcjob = Job.getInstance(conf); //指定我這個job所在的jar包 // wcjob.setJar("/home/hadoop/wordcount.jar"); wcjob.setJarByClass(WordCountRunner.class); wcjob.setMapperClass(WordCountMapper.class); wcjob.setReducerClass(WordCountReducer.class); //設置我們的業務邏輯Mapper類的輸出key和value的數據類型 wcjob.setMapOutputKeyClass(Text.class); wcjob.setMapOutputValueClass(IntWritable.class); //設置我們的業務邏輯Reducer類的輸出key和value的數據類型 wcjob.setOutputKeyClass(Text.class); wcjob.setOutputValueClass(IntWritable.class); //指定要處理的數據所在的位置 FileInputFormat.setInputPaths(wcjob, "hdfs://hdp-server01:9000/wordcount/data/big.txt"); //指定處理完成之後的結果所保存的位置 FileOutputFormat.setOutputPath(wcjob, new Path("hdfs://hdp-server01:9000/wordcount/output/")); //向yarn集群提交這個job boolean res = wcjob.waitForCompletion(true); System.exit(res?0:1); }
2.3集群運行模式
1) 將mapreduce程序提交給yarn集群resourcemanager,分發到很多的節點上並發執行
2) 處理的數據和輸出結果應該位於hdfs文件系統
3) 提交集群的實現步驟:
將程序打成JAR包,然後在集群的任意一個節點上用hadoop命令啟動hadoop jar wordcount.jar cn.bigdata.mrsimple.WordCountDriver inputpath outputpath
作者:傑瑞教育
出處:http://www.cnblogs.com/jerehedu/
版權聲明:本文版權歸煙臺傑瑞教育科技有限公司和博客園共有,歡迎轉載,但未經作者同意必須保留此段聲明,且在文章頁面明顯位置給出原文連接,否則保留追究法律責任的權利。
技術咨詢:
大數據開發 | MapReduce介紹