hadoop細節---Mapreduce過程詳解
一.先回顧一下hadoop api中的資料型別:
BooleanWritable:標準布林型數值
ByteWritable:單位元組數值
DoubleWritable:雙位元組數值
FloatWritable:浮點數
IntWritable:整型數
LongWritable:長整型數
Text:使用UTF8格式儲存的文字(類似java中的string)
NullWritable:當<key, value>
二.Map-Reduce執行過程:
1.Job類初始化JobClient例項,JobClient中生成JobTracker的RPC例項,這樣可以保持與JobTracker的通訊,JobTracker的地址和埠等都是外部配置的,通過Configuration物件讀取並且傳入。
2.JobClient提交作業。
3.JobClient生成作業目錄。
4.從本地拷貝MapReduce的作業jar檔案(一般是自己寫的程式程式碼jar)。
5.如果DistributedCache中有需要的資料,從DistributedCache中拷貝這部分資料。
6.根據InputFormat例項,實現輸入資料的split,在作業目錄上生成job.split和job.splitmetainfo檔案。
7.將配置檔案寫入到作業目錄的job.xml檔案中。
8.JobClient和JobTracker通訊,提交作業。
9.JobTracker將job加入到job佇列中。
10.JobTracker的TaskScheduler(任務排程器)對job佇列進行排程。
11.TaskTracker通過心跳和JobTracker保持聯絡,JobTracker收到後根據心跳帶來的資料,判斷是否可以分配給TaskTracker
Task,TaskScheduler會對Task
12.TaskTracker啟動TaskRunner例項,在TaskRunner中啟動單獨的JVM進行Mapper執行。
13.Map端會從HDFS中讀取輸入資料,執行之後Map輸出資料先是在記憶體當中,當達到閥值後,split到硬碟上面,在此過程中如果有combiner的話要進行combiner,當然sort是肯定要進行的。
14.Map結束了,Reduce開始執行,從Map端拷貝資料,稱為shuffle階段,之後執行reduce輸出結果資料。
15.當jobtracker收到作業最後一個任務完成通知後,便把作業的狀態設定為“完成”。
16.在jobclient查詢狀態時,便知道任務已經完成,於是從runjob()方法返回。
三.Map端機制:
自定義一個類,繼承於基類Mapper,該基類是一個泛型,有4個形參型別:用來指定map函式的輸入鍵、輸入值,輸出鍵、輸出值,如下public class Map extends Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>,,再重寫map函式。每一對<key,value>呼叫一次map函式。
(一)map輸入做的事:
1.反射構造InputFormat.
2.反射構造InputSplit.
3.建立RecordReader.
4.反射建立MapperRunner
(二)map輸出做的事:
1.如果有Partitioner的話,反射構造Partitioner。
2.將key/value/Partitioner資料寫入到記憶體當中。
3.當記憶體當中的資料達到一定閥值了,需要spill到硬碟上面,在spill前,需要進行排序,如果有combiner的話需要進行combiner。
4.sort的規則是先進行Partitioner的排序,然後再進行key的字典排序,預設的是快速排序。
5.當生成多個spill檔案時,需要進行歸併,最終歸併成一個大檔案。
關於排序:
1.在記憶體中進行排序,整個資料的記憶體不會進行移動,只是再加上一層索引的資料,排序只要調整索引資料就可以了
2.多個spill檔案歸併到一個大檔案時,是一個歸併排序的過程,每一個spill檔案都是按分割槽和key排序好的,所以歸併完的檔案也是按分割槽和key排序好的。
在進行歸併的時候,也不是一次性的把所有的spill檔案歸併成一個大檔案,而是部分spill檔案歸併成中間檔案,然後中間檔案和剩下的spill檔案再進行一次歸併,依次類推,這個的考慮還是因為一次歸併檔案太多的話IO消耗太大了,如下圖:
四.Reduce端機制:
自定義一個類,繼承於基類Reducer,該基類是一個泛型,有4個形參型別:用來指定reduce函式的輸入鍵、輸入值,輸出鍵、輸出值、
public class Reduce extends Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>,其中reduce的輸入型別必須與map的輸出型別一致。再重寫reduce方法。每一個key呼叫一次reduce方法。
(一)reduce輸入做的事:
1。ReduceTask有一個執行緒和TaskTracker聯絡,之後TaskTracker和JobTracker聯絡,獲取MapTask完成事件
2. ReduceTask會建立和MapTask數目相等的拷貝執行緒,用於拷貝MapTask的輸出資料,MapTask的資料一般都是非本地的
3. 當有新的MapTask完成事件時,拷貝執行緒就從指定的機器上面拷貝資料,是通過http的形式進行拷貝
4. 當資料拷貝的時候,分兩種情況,當資料量小的時候就會寫入記憶體當中,當資料量大的時候就會寫入硬碟當中,這些工作分別由兩個執行緒完成
5. 因為所有的資料都來自不同的機器,所以有多個檔案,這些檔案需要歸併成一個檔案,在拷貝檔案的時候就會進行歸併動作
6. 拷貝和歸併過程統稱為shuffle過程。
(二)Reduce輸出做的事:
1.構造RecordWriter,這個是根據客戶端設定的OutputFormat中getRecordWriter()方法得到
2.通過OutputFormat和RecordWriter將結果輸出到臨時檔案中
3.Rudece進行commit過程,和TaskTracker進行通訊,TaskTracker和JobTracker進行通訊,然後JobTracker返回commit的指令,Reduce進行
commit,將臨時結果檔案重新命名成最終的檔案
4.commit成功後,kill掉其他的TaskAttempt
五.主函式(作業的配置方法):
publicstatic void main(String[] args) throws Exception{
Configuration conf = newConfiguration();
String[]otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length != 2) {
System.err.println("Usage:Data ** <in> <out>");
System.exit(2);
}
Jobjob = new Job(conf, "**");
job.setJarByClass(**.class);
/ /設定Map和Reduce處理類
job.setMapperClass(Map.class);
//job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(Reduce.class);
//設定輸出型別
job.setOutputKeyClass(IntWritable.class);
job.setOutputValueClass(IntWritable.class);
//設定輸入和輸出目錄
FileInputFormat.addInputPath(job,new Path(otherArgs[0]));
FileOutputFormat.setOutputPath(job,new Path(otherArgs[1]));
System.exit(job.waitForCompletion(true)? 0 : 1);
}