1. 程式人生 > >Hadoop架構設計、執行原理具體解釋

Hadoop架構設計、執行原理具體解釋

下一個 發現 job調度 mmap tin 文件復制 必須 new 最大

1、Map-Reduce的邏輯過程

如果我們須要處理一批有關天氣的數據。其格式例如以下:

  • 依照ASCII碼存儲。每行一條記錄
  • 每一行字符從0開始計數,第15個到第18個字符為年
  • 第25個到第29個字符為溫度。當中第25位是符號+/-

0067011990999991950051507+0000+

0043011990999991950051512+0022+

0043011990999991950051518-0011+

0043012650999991949032412+0111+

0043012650999991949032418+0078+

0067011990999991937051507+0001+

0043011990999991937051512-0002+

0043011990999991945

051518+0001+

0043012650999991945032412+0002+

0043012650999991945032418+0078+

如今須要統計出每年的最高溫度。

Map-Reduce主要包含兩個步驟:Map和Reduce

每一步都有key-value對作為輸入和輸出:

  • map階段的key-value對的格式是由輸入的格式所決定的。假設是默認的TextInputFormat。則每行作為一個記錄進程處理。當中key為此行的開頭相對於文件的起始位置。value就是此行的字符文本
  • map階段的輸出的key-value對的格式必須同reduce階段的輸入key-value對的格式相相應

對於上面的樣例,在map過程,輸入的key-value對例如以下:

(0,0067011990999991950051507+0000+)

(33,0043011990999991950051512+0022+)

(66,0043011990999991950051518-0011+)

(99,0043012650999991949032412+0111+)

(132,0043012650999991949032418+0078+)

(165,0067011990999991937051507+0001+)

(198,0043011990999991937051512-0002+)

(231,0043011990999991945051518+0001+)

(264,0043012650999991945032412+0002+)

(297,0043012650999991945032418+0078+)

在map過程中。通過對每一行字符串的解析,得到年-溫度的key-value對作為輸出:

(1950, 0)

(1950, 22)

(1950, -11)

(1949, 111)

(1949, 78)

(1937, 1)

(1937, -2)

(1945, 1)

(1945, 2)

(1945, 78)

在reduce過程。將map過程中的輸出。依照同樣的key將value放到同一個列表中作為reduce的輸入

(1950, [0, 22, –11])

(1949, [111, 78])

(1937, [1, -2])

(1945, [1, 2, 78])

在reduce過程中,在列表中選擇出最大的溫度,將年-最大溫度的key-value作為輸出:

(1950, 22)

(1949, 111)

(1937, 1)

(1945, 78)

其邏輯過程可用例如以下圖表示:

技術分享

下圖大概描寫敘述了Map-Reduce的Job執行的基本原理:

技術分享

以下我們討論JobConf。其有非常多的項能夠進行配置:

  • setInputFormat:設置map的輸入格式。默覺得TextInputFormat,key為LongWritable,value為Text
  • setNumMapTasks:設置map任務的個數。此設置通常不起作用,map任務的個數取決於輸入的數據所能分成的inputsplit的個數
  • setMapperClass:設置Mapper。默覺得IdentityMapper
  • setMapRunnerClass:設置MapRunner, maptask是由MapRunner執行的。默覺得MapRunnable,其功能為讀取inputsplit的一個個record,依次調用Mapper的map函數
  • setMapOutputKeyClass和setMapOutputValueClass:設置Mapper的輸出的key-value對的格式
  • setOutputKeyClass和setOutputValueClass:設置Reducer的輸出的key-value對的格式
  • setPartitionerClass和setNumReduceTasks:設置Partitioner。默覺得HashPartitioner,其依據key的hash值來決定進入哪個partition,每一個partition被一個reduce task處理,所以partition的個數等於reducetask的個數
  • setReducerClass:設置Reducer,默覺得IdentityReducer
  • setOutputFormat:設置任務的輸出格式,默覺得TextOutputFormat
  • FileInputFormat.addInputPath:設置輸入文件的路徑,能夠使一個文件,一個路徑,一個通配符。能夠被調用多次加入多個路徑
  • FileOutputFormat.setOutputPath:設置輸出文件的路徑,在job執行前此路徑不應該存在

當然不用全部的都設置。由上面的樣例。能夠編寫Map-Reduce程序例如以下:

public class MaxTemperature {

publicstatic void main(String[] args) throws IOException {

if (args.length != 2) {

System.err.println("Usage: MaxTemperature <inputpath> <outputpath>");

System.exit(-1);

}

JobConf conf = new JobConf(MaxTemperature.class);

conf.setJobName("Max temperature");

FileInputFormat.addInputPath(conf, new Path(args[0]));

FileOutputFormat.setOutputPath(conf, new Path(args[1]));

conf.setMapperClass(MaxTemperatureMapper.class);

conf.setReducerClass(MaxTemperatureReducer.class);

conf.setOutputKeyClass(Text.class);

conf.setOutputValueClass(IntWritable.class);

JobClient.runJob(conf);

}

}

3、Map-Reduce數據流(data flow)

Map-Reduce的處理過程主要涉及下面四個部分:

  • clientClient:用於提交Map-reduce任務job
  • JobTracker:協調整個job的執行。其為一個Java進程,其main class為JobTracker
  • TaskTracker:執行此job的task,處理input split,其為一個Java進程,其mainclass為TaskTracker
  • HDFS:hadoop分布式文件系統,用於在各個進程間共享Job相關的文件

技術分享

3.1、任務提交

JobClient.runJob()創建一個新的JobClient實例,調用其submitJob()函數。

  • 向JobTracker請求一個新的job ID
  • 檢測此job的output配置
  • 計算此job的input splits
  • 將Job執行所需的資源復制到JobTracker的文件系統中的目錄中,包含jobjar文件。job.xml配置文件,input splits
  • 通知JobTracker此Job已經能夠執行了

提交任務後,runJob每隔一秒鐘輪詢一次job的進度,將進度返回到命令行,直到任務執行完成。

3.2、任務初始化

當JobTracker收到submitJob調用的時候,將此任務放到一個隊列中,job調度器將從隊列中獲取任務並初始化任務。

初始化首先創建一個對象來封裝job執行的tasks, status以及progress。

在創建task之前,job調度器首先從共享文件系統中獲得JobClient計算出的input splits。

其為每一個input split創建一個map task。

每一個task被分配一個ID。

3.3、任務分配

TaskTracker周期性的向JobTracker發送heartbeat。

在heartbeat中。TaskTracker告知JobTracker其已經準備執行一個新的task。JobTracker將分配給其一個task。

在JobTracker為TaskTracker選擇一個task之前。JobTracker必須首先依照優先級選擇一個Job,在最高優先級的Job中選擇一個task。

TaskTracker有固定數量的位置來執行map task或者reduce task。

默認的調度器對待map task優先於reduce task

當選擇reduce task的時候。JobTracker並不在多個task之間進行選擇,而是直接取下一個,由於reducetask沒有數據本地化的概念。

3.4、任務運行

TaskTracker被分配了一個task,以下便要執行此task。

首先。TaskTracker將此job的jar從共享文件系統中復制到TaskTracker的文件系統中。

TaskTracker從distributed cache中將job執行所須要的文件復制到本地磁盤。

其次,其為每一個task創建一個本地的工作文件夾。將jar解壓縮到文件文件夾中。

其三,其創建一個TaskRunner來執行task。

TaskRunner創建一個新的JVM來執行task。

被創建的child JVM和TaskTracker通信來報告執行進度。

3.4.1、Map的過程

MapRunnable從inputsplit中讀取一個個的record,然後依次調用Mapper的map函數,將結果輸出。

map的輸出並非直接寫入硬盤,而是將其寫入緩存memory buffer。

當buffer中數據的到達一定的大小。一個背景線程將數據開始寫入硬盤。

在寫入硬盤之前,內存中的數據通過partitioner分成多個partition。

在同一個partition中,背景線程會將數據依照key在內存中排序。

每次從內存向硬盤flush數據。都生成一個新的spill文件。

當此task結束之前。全部的spill文件被合並為一個整的被partition的並且排好序的文件。

reducer能夠通過http協議請求map的輸出文件,tracker.http.threads能夠設置http服務線程數。

3.4.2、Reduce的過程

當map task結束後。其通知TaskTracker。TaskTracker通知JobTracker。

對於一個job,JobTracker知道TaskTracer和map輸出的相應關系。

reducer中一個線程周期性的向JobTracker請求map輸出的位置,直到其取得了全部的map輸出。

reduce task須要其相應的partition的全部的map輸出。

reduce task中的copy過程即當每一個map task結束的時候就開始拷貝輸出。由於不同的maptask完畢時間不同。

reduce task中有多個copy線程,能夠並行拷貝map輸出。

當非常多map輸出復制到reduce task後。一個背景線程將其合並為一個大的排好序的文件。

當全部的map輸出都復制到reduce task後,進入sort過程,將全部的map輸出合並為大的排好序的文件。

最後進入reduce過程,調用reducer的reduce函數,處理排好序的輸出的每一個key。最後的結果寫入HDFS。

技術分享

3.5、任務結束

當JobTracker獲得最後一個task的執行成功的報告後,將job得狀態改為成功。

當JobClient從JobTracker輪詢的時候。發現此job已經成功結束,則向用戶打印消息,從runJob函數中返回。


如有不懂,歡迎撥打10010或10086。轉何哲江。

Hadoop架構設計、執行原理具體解釋