1. 程式人生 > >hadoop細節---Mapreduce過程詳解

hadoop細節---Mapreduce過程詳解

一.先回顧一下hadoop api中的資料型別:

               BooleanWritable:標準布林型數值

             ByteWritable:單位元組數值

             DoubleWritable:雙位元組數值

             FloatWritable:浮點數

             IntWritable:整型數

             LongWritable:長整型數

             Text:使用UTF8格式儲存的文字(類似java中的string

             NullWritable:<key, value>

中的keyvalue為空時使用

二.Map-Reduce執行過程:

          

1.Job類初始化JobClient例項,JobClient中生成JobTrackerRPC例項,這樣可以保持與JobTracker的通訊,JobTracker的地址和埠等都是外部配置的,通過Configuration物件讀取並且傳入。

2.JobClient提交作業。

3.JobClient生成作業目錄。

4.從本地拷貝MapReduce的作業jar檔案(一般是自己寫的程式程式碼jar)

5.如果DistributedCache中有需要的資料,從DistributedCache中拷貝這部分資料。

6.根據InputFormat例項,實現輸入資料的split,在作業目錄上生成job.splitjob.splitmetainfo檔案。

7.將配置檔案寫入到作業目錄的job.xml檔案中。

8.JobClientJobTracker通訊,提交作業。

9.JobTrackerjob加入到job佇列中。

10.JobTrackerTaskScheduler(任務排程器)對job佇列進行排程。

11.TaskTracker通過心跳和JobTracker保持聯絡,JobTracker收到後根據心跳帶來的資料,判斷是否可以分配給TaskTracker TaskTaskScheduler會對Task

進行分配。

12.TaskTracker啟動TaskRunner例項,在TaskRunner中啟動單獨的JVM進行Mapper執行。

13.Map端會從HDFS中讀取輸入資料,執行之後Map輸出資料先是在記憶體當中,當達到閥值後,split到硬碟上面,在此過程中如果有combiner的話要進行combiner,當然sort是肯定要進行的。

14.Map結束了,Reduce開始執行,從Map端拷貝資料,稱為shuffle階段,之後執行reduce輸出結果資料。

15.jobtracker收到作業最後一個任務完成通知後,便把作業的狀態設定為“完成”。

16.jobclient查詢狀態時,便知道任務已經完成,於是從runjob()方法返回。

三.Map端機制:

自定義一個類,繼承於基類Mapper,該基類是一個泛型,有4個形參型別:用來指定map函式的輸入鍵、輸入值,輸出鍵、輸出值,如下public class Map extends Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>,,再重寫map函式。每一對<key,value>呼叫一次map函式。

(一)map輸入做的事:

1.反射構造InputFormat.                                      

2.反射構造InputSplit.

3.建立RecordReader.

4.反射建立MapperRunner

(二)map輸出做的事:

1.如果有Partitioner的話,反射構造Partitioner
2.key/value/Partitioner資料寫入到記憶體當中。
3.當記憶體當中的資料達到一定閥值了,需要spill到硬碟上面,在spill前,需要進行排序,如果有combiner的話需要進行combiner
4.sort的規則是先進行Partitioner的排序,然後再進行key的字典排序,預設的是快速排序。
5.當生成多個spill檔案時,需要進行歸併,最終歸併成一個大檔案。

關於排序:
1.在記憶體中進行排序,整個資料的記憶體不會進行移動,只是再加上一層索引的資料,排序只要調整索引資料就可以了
2.多個spill檔案歸併到一個大檔案時,是一個歸併排序的過程,每一個spill檔案都是按分割槽和key排序好的,所以歸併完的檔案也是按分割槽和key排序好的。

                                

   在進行歸併的時候,也不是一次性的把所有的spill檔案歸併成一個大檔案,而是部分spill檔案歸併成中間檔案,然後中間檔案和剩下的spill檔案再進行一次歸併,依次類推,這個的考慮還是因為一次歸併檔案太多的話IO消耗太大了,如下圖:

四.Reduce端機制:

自定義一個類,繼承於基類Reducer,該基類是一個泛型,有4個形參型別:用來指定reduce函式的輸入鍵、輸入值,輸出鍵、輸出值、

public class Reduce extends Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,其中reduce的輸入型別必須與map的輸出型別一致。再重寫reduce方法。每一個key呼叫一次reduce方法。

(一)reduce輸入做的事:
1。ReduceTask有一個執行緒和TaskTracker聯絡,之後TaskTracker和JobTracker聯絡,獲取MapTask完成事件
2. ReduceTask會建立和MapTask數目相等的拷貝執行緒,用於拷貝MapTask的輸出資料,MapTask的資料一般都是非本地的
3. 當有新的MapTask完成事件時,拷貝執行緒就從指定的機器上面拷貝資料,是通過http的形式進行拷貝
4. 當資料拷貝的時候,分兩種情況,當資料量小的時候就會寫入記憶體當中,當資料量大的時候就會寫入硬碟當中,這些工作分別由兩個執行緒完成
5. 因為所有的資料都來自不同的機器,所以有多個檔案,這些檔案需要歸併成一個檔案,在拷貝檔案的時候就會進行歸併動作
6. 拷貝和歸併過程統稱為shuffle過程。

(二)Reduce輸出做的事:
1.構造RecordWriter,這個是根據客戶端設定的OutputFormatgetRecordWriter()方法得到
2.通過OutputFormatRecordWriter將結果輸出到臨時檔案中
3.Rudece進行commit過程,和TaskTracker進行通訊,TaskTrackerJobTracker進行通訊,然後JobTracker返回commit的指令,Reduce進行
commit,將臨時結果檔案重新命名成最終的檔案
4.commit成功後,kill掉其他的TaskAttempt

   五.主函式(作業的配置方法):

             publicstatic void main(String[] args) throws Exception{

               Configuration conf = newConfiguration();

             String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();

             if(otherArgs.length != 2) {

             System.err.println("Usage:Data ** <in> <out>");

             System.exit(2);

             }

             Jobjob = new Job(conf, "**");

             job.setJarByClass(**.class);

/ /設定MapReduce處理類

             job.setMapperClass(Map.class);

//job.setCombinerClass(IntSumReducer.class);

             job.setReducerClass(Reduce.class);

             //設定輸出型別

             job.setOutputKeyClass(IntWritable.class);

             job.setOutputValueClass(IntWritable.class);

             //設定輸入和輸出目錄

             FileInputFormat.addInputPath(job,new Path(otherArgs[0]));

             FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));

             System.exit(job.waitForCompletion(true)? 0 : 1);

             }