1. 程式人生 > >MapReduce簡單介紹及入門程式

MapReduce簡單介紹及入門程式

1、MapReduce 入門

1.1、什麼是 MapReduce
hadoop 的四大元件:
HDFS:分散式儲存系統
MapReduce:分散式計算系統
YARN:hadoop 的資源排程系統
Common:以上三大元件的底層支撐元件,主要提供基礎工具包和 RPC 框架等

MapReduce 是一個分散式運算程式的程式設計框架
,是使用者開發“基於 Hadoop 的資料分析應用”的核心框架
1.2、為什麼需要 MapReduce

  1. 海量資料在單機上處理因為硬體資源限制,無法勝任
  2. 而一旦將單機版程式擴充套件到叢集來分散式執行,將極大增加程式的複雜度和開發難度
  3. 引入 MapReduce 框架後,開發人員可以將絕大部分工作集中在業務邏輯的開發上,而將分散式計算中的複雜性交由框架來處理

Hadoop 當中的 MapReduce 就是這樣的一個分散式程式運算框架,它把大量分散式程式都會
涉及的到的內容都封裝進了,讓使用者只用專注自己的業務邏輯程式碼的開發。它對應以上問題
的整體結構如下:

  • MRAppMaster:MapReduce Application Master,分配任務,協調任務的執行
    MapTask:階段併發任,負責 mapper 階段的任務處理 YARNChild
    ReduceTask:階段彙總任務,負責 reducer 階段的任務處理 YARNChild

1.3、MapReduce 程式執行j簡單wordcount演示

package Test;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

// 資料樣式為 hello hi nice to meet you
public class MyMapReduceWordCount {
    public static  class MyMap extends Mapper<LongWritable, Text, Text, IntWritable>{
    	@Override
    	protected void map(LongWritable key, Text value,Context context)
    			throws IOException, InterruptedException {
    		String[] fields = value.toString().split(" ");
    		for(String s: fields){
    			context.write(new Text(s), new IntWritable(1));
    		}
    	}
    }
    
    public static class MyReduce extends Reducer<Text, IntWritable, Text, IntWritable>{
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    	  Context context) throws IOException, InterruptedException {
    		int count=0;
    		for(IntWritable i:values){
    			count+=i.get();
    		} 
    		context.write(key, new IntWritable(count));
    	}
    }
   public static void main(String[] args) {
		Configuration conf=new Configuration();
		System.setProperty("HADOOP_USER_NAME", "qyl");
		conf.set("fs.defaultFS", "hdfs://qyl01:9000");
		try {
			Job job=Job.getInstance(conf);
			job.setJarByClass(MyMapReduceWordCount.class);
			job.setMapperClass(MyMap.class);
			job.setReducerClass(MyReduce.class);
			
			job.setOutputKeyClass(Text.class);
			job.setOutputValueClass(IntWritable.class);
			
			Path inPath =new Path("/wordcount");
			FileInputFormat.addInputPath(job, inPath);
			
			Path outpath=new Path("/wordcount/wordcount_result");
			if(outpath.getFileSystem(conf).exists(outpath)){
				outpath.getFileSystem(conf).delete(outpath, true);
			}
			FileOutputFormat.setOutputPath(job, outpath);
			
			job.waitForCompletion(true);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

2、MapReduce 程式的核心執行

2.1、概述
一個完整的 MapReduce 程式在分散式執行時有兩類例項程序:
1、MRAppMaster:負責整個程式的過程排程及狀態協調
2、Yarnchild:負責 map 階段的整個資料處理流程
3、Yarnchild:負責 reduce 階段的整個資料處理流程以上兩個階段 MapTask 和 ReduceTask 的程序都是 YarnChild,並不是說這 MapTask 和ReduceTask 就跑在同一個 YarnChild 進行裡

2.2、MapReduce 程式的執行流程
1、一個 mr 程式啟動的時候,最先啟動的是 MRAppMaster,MRAppMaster 啟動後根據本次
job 的描述資訊,計算出需要的 maptask 例項數量,然後向叢集申請機器啟動相應數量的
maptask 程序
2、 maptask 程序啟動之後,根據給定的資料切片(哪個檔案的哪個偏移量範圍)範圍進行數
據處理,主體流程為:

  • A、利用客戶指定的 InputForm來獲取 RecordReader 讀取資料,形成輸入 KV 對
  • B、將輸入 KV 對傳遞給客戶定義的 map()方法,做邏輯運算,並將 map()方法輸出的 KV 對收 集到快取
    C、將快取中的 KV 對按照 K分割槽排序後不斷溢寫到磁碟檔案

3、 MRAppMaster 監控到所有 maptask 程序任務完成之後(真實情況是,某些 maptask 程序處理完成後,就會開始啟動 reducetask 去已完成的 maptask 處 fetch 資料),會根據客指定的引數啟動相應數量的 reducetask 程序,並告知 reducetask 程序要處理的資料範圍(資料分割槽)
4、Reducetask 程序啟動之後,根據 MRAppMaster 告知的待處理資料所在位置,從若干臺maptask 執行所在機器上獲取到若干個 maptask 輸出結果檔案,並在本地進行重新歸併序,然後按照相同 key 的 KV 為一個組,呼叫客戶定義的 reduce()方法進行邏輯運算,並收集運算輸出的結果 KV,然後呼叫客戶指定的 OutputFormat 將結果資料輸出到外部儲存

2.3、MapTask 並行度決定機制
maptask 的並行度決定 map 階段的任務處理併發度,進而影響到整個 job 的處理速度那麼,mapTask 並行例項是否越多越好呢?其並行度又是如何決定呢?一個 job 的 map 階段並行度由客戶端在提交 job 時決定,客戶端對 map 階段並行度的規劃的基本邏輯為:

將待處理資料執行邏輯切片(即按照一個特定切片大小,將待處理資料劃分成邏輯上的多
個 split),然後每一個 split 分配一個 mapTask 並行例項處理

2.4、切片機制
FileInputFormat 中預設的切片機制
1、簡單地按照檔案的內容長度進行切片
2、切片大小,預設等於 block 大小
3、切片時不考慮資料集整體,而是逐個針對每一個檔案單獨切片
比如待處理資料有兩個檔案:
File1.txt 200M
File2.txt 100M
經過 getSplits()方法處理之後,形成的切片資訊是:
File1.txt-split1 0-128M
File1.txt-split2

2.5、ReduceTask 並行度決定機制
reducetask 的並行度同樣影響整個 job 的執行併發度和執行效率,但與 maptask 的併發數由切片數決定不同,Reducetask 數量的決定是可以直接手動設定:job.setNumReduceTasks(4);預設值是 1,手動設定為 4,表示執行 4 個 reduceTask,設定為 0,表示不執行 reduceTask 任務,也就是沒有 reducer 階段,只有 mapper 階段
如果資料分佈不均勻,就有可能在 reduce 階段產生資料傾斜
注意:reducetask 數量並不是任意設定,還要考慮業務邏輯需求,有些情況下,需要計算全域性彙總結果,就只能有 1 個 reducetask。
儘量不要執行太多的 reducetask。對大多數 job 來說,最好 rduce 的個數最多和叢集中的reduce 持平,或者比叢集的 reduce slots 小。這個對於小叢集而言,尤其重要。