1. 程式人生 > >hadoop隨手筆記(2)--mapreduce的執行機理

hadoop隨手筆記(2)--mapreduce的執行機理

(1)InputFormat輸入格式:
裡面定義了getSplits方法,主要將輸入的檔案分割成邏輯上的多個分片InputSplit,這裡面的分片不是真正意義上的分片,只是邏輯上的分片,每個分片同夥輸入檔案的路徑、開始時為止和偏移量三個資訊來唯一標識。
使用createRecordReader方法去建立一個RecorReader記錄讀取器,分別讀取輸入分片中的鍵值對,交給Map處理:在MapReduce作業的時候會做以下工作:1.檢查作業的輸入是否有效。2.將輸入檔案分割成多個分片,並交給單獨的Map進行處理,就是說map的個數是由分片的數量決定的。3.RecordReader作為傳遞的載體,將每個分片傳遞給map。
(2)CombineFileInputFormat組合檔案輸入格式
會將來自不同問價的分片打包成一個更大的輸入分片,但是不會將來自不同機架中的檔案放入到同一個輸入分片中。當用戶致電maxSpliteSize屬性時,會將同一個資料節點的資料塊組成一個輸入分片,剩餘的資料會與在同一個機架上其他手節點上的資料塊組成一個分片。
(資料本地化:將資料即時的在計算節點上進行計算,而不是將資料放到計算節點在進行計算)


MR基本的檔案輸入格式:FileInputFormat檔案輸入格式、TextInputFormat文字檔案輸入格式、KeyValueTextInputFormat鍵值對檔案輸入格式、CombineFileInputFormat組合檔案輸入格式,SequenceFileInputFormat序列檔案輸入格式、DBInputFormat資料庫輸入格式
sequenceFile序列檔案
是一個由二進位制形式key/value的位元組流組成的儲存檔案,類似於log檔案,但是log檔案儲存的是純文字檔案。SequenceFile可壓縮可切分,適合hadoop檔案儲存特性,SequenceFile分別提供了Writer、Reade、Sorter來實現對資料的讀、寫和排序,sequecefile是hdfs的一種檔案型別,提倡一次寫入多次讀取,但是並不提倡多一個檔案進行追加或者修改,其中該檔案的三種壓縮處理方式:
1.不壓縮資料直接儲存的方式。2壓縮value值而不壓縮key值儲存的儲存方式,使用enum.RECORD來標識。3key/value值都壓縮的方式儲存,使用enum.BLOCK來標識。
三種不同的壓縮方式共用一個數據頭,流方式的讀取會先讀取頭位元組去判斷是那種方式的壓縮,根據壓縮方式去解壓縮並反序列化位元組流資料,得到可識別的資料。
MapFile對映檔案
這類代表是一個基於從key到value的map檔案,這裡的一個map是一個目錄,該目錄下面包含兩個檔案,一個數據檔案,資料檔案中包含了map中的所有key姐value的值;另一個是一個比較小的索引檔案,包含了key的一個fraction。
(3)DBInputFormata資料庫輸入格式:
hadoop儲存開始的設計主要是針對於非結構化或者半結構化的資料進行儲存與分析。主要以資料庫記錄作為輸入格式,然後將記錄交個Map函式進行處理。其中主要是在getspilite()方法使用例如SQL語句來獲取結果集,並將結果集填充到輸入分片中,在這裡與M/R進行建立聯絡。避免資源的浪費,提高系統的效能,通過finally來保證在盡心資料操作的過程中是否出現了異常,當我們使用完資料庫資源的以後都將它們成功釋放掉。最後在createRecordReader方法會返回一個針對不同資料庫產品來進行讀取操作的RecordReader。
(4)MultipleInput多種輸入格式
它並沒有繼承InputFormat類,也就是說不能直接作為Map輸入來處理。只要是為下面的DelegatingInputFormat輸入格式提供輔助作用。M/R作業可以有不同的輸入路徑,並且不同的輸入路徑對應著不用的InputFormat,同時也有可能不同的輸入路徑對應著不同的Map函式。
(5)DelegatingInputFormat授權輸入格式
該類的主要作用是將不同的輸入路徑的處理授權給不同的InputFormat,而且每一個路徑對應一個單獨Mapper處理程式,在該類內部主要通過MultipleInput來獲得Path與InputFormat、Mapper的對應關係。
(6)輸入分片
輸入分片InputSplite是一個單獨的Map要處理的資料單元。輸入分片的資料型別一般都是位元組型別的。輸入分片經過相應的RecordReader處理後,轉化成記錄試圖的形式,然後交給Map處理。FileSplite類預設的是InputSplite,CombineFileSplite與CombineFileInputFormat輸入格式相對應,對於前者是一個檔案的一個輸入分片,而厚重是代表來自多個檔案的輸入分片組成的一個輸入分片,其中的每個CombineFileSplite會包含來自不同檔案的資料塊,但是在同一個分片中的所有資料塊都是在同一個機架上。
(7)記錄讀取器RecordReader
初始化方法(只是執行一次)->取得輸入分片中的下一個鍵值對->取得當前讀取到的鍵值對中的鍵物件->取得鍵值對中的值物件->取得當前資料讀取的進度,該方法會返回一個0.0--1.0之間的浮點數。
(8)輸出格式OutputFormat
OutputFormat類描述了M/R作業的輸出規範,決定了該作業的輸出結果儲存到哪裡以及如何對輸出結果進行持久化操作,完成以下的工作:1.檢查作業的輸出是否有效,比如目錄是否存在,hadoop在執行作業之前是不允許輸出目錄存在的。2.提供一個RecordWrite類,將M/R作業的處理結果報錯到指定檔案系統的檔案中。
(9)RecordWriter記錄寫入器
OutputFormat提供了RecordWriter用於將M/R作業的鍵值對結果寫入到指定的輸出中。
(10)輸出提交器OutputCommitter
主要是控制M/R作業的輸出環境,主要用來完成以下工作:1.在OutputCommitter初始化的時候啟動job。比如會建立job的臨時輸出目錄。2.在作業完成之後會清楚job申請的資源。比如會刪除job的臨時輸出目錄。3.為Mapper或者Reducer任務建立臨時的輸出目錄。4.檢查Mapper或者Reducer任務是否需要提交。5提交或者丟棄MapReucer任務的輸出。
(11)Context與ID
hadoop中的每一個job以及組成的job的Map與Reduce任務都有一個唯一的標識,其中,JobID是系統分配給作業的唯一標識,他的字串形式為job_<jobtrackerID>_<jobNumber>。TaskID是系統分配給任務的唯一標識,TaskID包含了它所屬的作業ID,同時也有任務ID,同時還保持了這是否是一個Map任務。TaskID是字串標識為task_<jobtrackerID>_<jobNumber>_[m|r]_<taskNumber>.
(12)OutputCommitter類
主要作用是在作業啟動的時候在作業的輸出目錄下建立相應的臨時目錄,Mapper與Reducer的輸出首先會寫入到這些臨時目錄當中,當他們都成功完成後,臨時目錄的內容會轉移到輸出目錄,之後臨時目錄會被刪除。
(13)jobID
用來定位到job、殺死該job、查詢該job執行情況。在執行一個job的時候會分成若干個Task,TaskAttemptID主要是在Mapper或者Reducer任務失敗的時候重新執行該任務,為了標識同一個任務的多次嘗試,使用該ID.
(14)Map處理過程
Mapper函式的最核心作用就是對輸入的key/value進行處理,然後輸出一系列的key/value集合。M/R框架會為由InputFormat生成的每一個輸入分片InputSplit建立一個相應的Mapper處理函式,但是一個輸入key/value鍵值對可能對應著0個或者多個輸出key/value對。然後將具有相同輸出的key的key/value鍵值對放在一起,然後將它們分發給相同的Reducer來處理,使用者可以通過指定特定的RawComparator實現類來控制分組過程的執行,也可以通過制定Partitioner實現類來控制Mapper的輸出被分發給哪個具體的Reducer進行處理。由於Reducer和Mapper一般會執行在不同的主機上,所以Reducer必須通過網路來獲取Mapper的輸出結果作為輸入。
為了減少網路上的資料傳輸量,我們可以為作業指定相應的Combiner。該類一般會採用已經實現的Reducer來代替。Combiner會在Mapper端對Mapper的輸出結果進行本地的聚集處理,從而減少傳送給Reducer的資料量。另外一種減少Mapper傳送給Reducer的資料的方法是採用想用的壓縮機只對Mapper的輸出進行壓縮處理,這同時也減少了Mapper輸出的key/value所佔的儲存空間的大小。此外,並不是所有的hadoop作業都有Reducer處理函式,當沒有Reducer時,Mapper的處理結果會直接通過制定的OutputFormat寫入到輸出目錄中。
在Mapper類中,setup()方法會在mapper任務開始的時候被呼叫一次,cleanup方法也只會在任務結束的時候被呼叫一次,
分類:InverseMapper反轉Mapper、TokenCounterMapper標記計數Mapper、MultithreadedMapper多執行緒Mapper(主要工作原理就是啟動多個執行緒來執行另一個Mapper中的map方法,這種方式可以有效的提供系統處理作業的能力,預設為10個執行緒)
(15)FiledSelectionMapper欄位選擇Mapper
將輸入資料看做由使用者指定的分隔符分隔的不同欄位組成,預設的分隔符是tab。FiledSelectionMapper可以選擇輸入欄位列表中的若干個欄位作為輸出的key和value。
(16)DelegatingMapper授權Mapper
(17)Reducer的數量是可以通過job的setNumReduceTasks來進行設定,
1.shuffle階段:首先利用http網路協議將所有的Mapper的輸出中與該Rducer相關的資料複製到Reducer的主機上。
2.sort階段:M/R框架會將來自不同Mapper的具有相同key的輸出key/value鍵值對按照key進行排列。
3.Reduce階段:M/R框架會為已經分好組的每一個<key,(list of values)>呼叫一次reduce方法,Reducer的輸出鍵值對是通過TaskInputOutputContext.write方法寫入到RecordWrite中,然後由RecordWrite寫入到真正的檔案系統中。其中Reducer的輸出並不是排好序的。
(18)Partitioner分割槽處理
主要是將Mapper處理之後的資料按照key分發給不同的Reducer任務進行處理
1.平均分佈:即每個Reducer處理的Record數量應該儘可能相等
2.高效:由於在分發給不同的reducer之前都要經過partition進行處理,所以這個部分需要高效的演算法實現。
(19)TotalOrderPartitioner全排序分割槽
每個mapper的輸出是排好序的,但是不同的mapper的輸出之間是沒有順序的,為了實現最終的Reducer的輸出是排好序的,此時可以使用該類。為了排序,首先利用InputSample資料取樣器來確定資料的分佈情況。
(20)JobClient的執行過程分析

job的waitForCompletion方法內部實際上是依靠JobClient來向JobTracker來提交作業的,當JobTracker接收到JobClient的提交作業的請求後,會將作業加入到佇列中,之後會返回給JobClient一個用於唯一標識作業的JobID物件。JobTracker作業佇列中的作業會有TaskTracker來執行。TaskTracker會定期向JobTracker傳送心跳,查詢JobTracker是否有任務需要進行。如果有,JobTracker會將任務通過心跳響應分配給TaskTracker來執行。當TaskTracker接收到任務之後,會在本地啟動一個Task來執行任務。


(21)其中job的設定在內部實現的時候也是對其conf進行設定,也可以對JVM進行設定。
(22)JobSunbmissionProtocol作業提交介面
這個協議藉口是JobClient和JobTracker進行通訊所需要的協議藉口,在該介面總定義了JobClient用於向JobTracker提交作業、獲取作業的執行資訊等方法。該介面的實現類是JobTracker與LocalJobRunner。
(23)RunningJob正在執行的Job作業的介面,可以執行該例項去查詢正在執行的job的相關資訊。RunningJob中很多方法名和Job以及提到的JobSubmissionProtocol中的方法名一樣,當客戶端呼叫Job物件中的一個方法時,比如killJob方法,此時在reduceProgress內部呼叫的是Job中的JobClient返回的RunningJob物件中相對應的方法來完成相應的功能。而JobClient返回的RunningJob物件例項內部呼叫的是JobSubmissonProtocol介面實現類中所對應的方法,從而我們可以看出真正為我們提供功能的是JobSubmissionProtocol實現類JobTracker或者LocalJobRunner。
(24)JobStatus物件代表的是Job當前執行狀態資訊,這些資訊無時無刻在改變,例如進度資訊。JobProfile物件代表的是Job新增到M/R框架之後所攜帶的註冊資訊,這些資訊是不會進行改變的。
(25)JobClient提交作業流程
1.向JobTracker請求一個新的作業ID物件JobID。2.檢查Job的輸入輸出。hadoop不允許MR作業的輸入為空,而且使用者可以為MR作業指定多個輸入目錄。但是在執行作業前,輸出目錄是不能存在的,它是有MR框架為我們建立的。3.計算出作業的所有InputSplit輸入分片數即需要的Mapper任務的數量。如果無法對輸入檔案進行劃分,MR框架會丟擲異常。4.啟動與Job相關的分散式快取DistributedCache。5.將作業執行需要的資源包括jar包、配置檔案等從hadoop的分散式檔案複製到JobTracker的檔案系統的指定目錄中。6.最後,將作業提交到JobTracker的作業佇列中,並監控作業的執行狀況。
(26)Job隊形的submit或者waitForCompletion(內部也是呼叫的submit)來進行作業的提交,但是該方法的內部物件JobClient呼叫submitJobInternal方法來真正執行的,其實Job就是JobClient進行了包裝。JobClient是使用者作業與JobTracker進行互動的介面。
(27)使用者指定的檔案系統與JobTracker的檔案系統。

當hadoop啟動的時候,JobTracker是作為單獨的一個JVM來執行的,包含有一個main方法是jobTracker的入口函式。JobTracker會一直等待JobClient通過RPC來提交作業,它排程JobClient提交的作業中的每一個任務,並監控他們的執行。當發現有失敗的任務的時,JobTracker會重新執行它。並且每一個TaskTracker會一直向JobTracker傳送心跳,詢問JobTracker是否有任務需要進行處理,如果有任務要執行,JobTracker會將任務分配給TaskTracker來執行。

(28)Queue job佇列物件,JobTracker是以佇列的方式來管理使用者提交的job的,Queue代表JobTracker中的一個Job佇列,當用戶提交Job的時候,會將job新增到使用者指定的佇列中。使用QueueManager Job佇列管理物件管理所有的job佇列,MR框架可以根據不同的作業排程器來定義一個或者多個作業佇列。一些排程器只能在一個佇列上面進行工作,而一些排程器可以在多個佇列上進行工作。

(29)執行緒池中的執行緒重用,可以降低記憶體資源的利用度。

(30)JobTracker中的作業回去管理器RecoveryManager的主要作業就是在JobTracker宕機或者重啟的時候,回覆執行那些還有完成的job。JobTracker管理著一個系統目錄,在JobClient向JobTracker提交job之前,會為提交的Job在系統的目錄下面建立一個以Job的id命名的子目錄,然後會把Job所需要的一切檔案複製到它的目錄下面,系統目錄是由mapred.systerm.dir配置項所指定的。當一個job完成的時候,job會將系統目錄下的子目錄刪除,JobTracker節點在重啟的時候也會檢查系統目錄下有沒有子目錄。

(31)JobTracker作為一個單獨的java程序來協調整個Job的執行,JobTracker負責排程處理JobClient提交的作業中的每一個任務,JobTracker會將Job分割成單獨Task,然後提交到TaskTracker來執行。

(32)hadoop作業排程器概述

1.JobQueueTaskScheduler(FIFO排程器)

在hadoop早期的版本中,JobTracker是按照使用者提交作業的順序即先進先出排程演算法來執行作業的。使用者提交的所有作業會被新增到一個執行佇列中,然後JobTracker首先會按照作業的優先順序的高低,再按照作業提交的先後順序來選擇需要執行的作業。當使用這種演算法時候,如果先提交的作業的執行時間很長的話,改作業之後的作業會遲遲得不到解決,從而影響使用者的體驗。

2.LimitTasksPerJobTaskScheduler

該排程器繼承自JobQueueTaskScheduler,它在JobQueueTaskScheduler的基礎上增加了對每個Job總的任務數的限制。

3.CapacityTaskScheduler(計算能力排程器)

支援多個佇列,每個佇列可配置一定的資源量,在每個對壘內部採用FIFO排程策略。預設情況下該排程器內部不支援優先順序,但是可以在配置檔案中開啟此選項。如果支援優先順序,那麼排程演算法就是帶有優先順序的FIFO演算法。但是該排程器不支援優先順序搶佔,一旦一個作業開始執行,在執行完之前它的資源不會被高優先順序作業所搶佔。為了防止同一個使用者的作業獨佔佇列的資源,該排程器會對同一使用者提交的作業所佔資源量進行限定。該排程器執行排程時候,首先會計算每個佇列中正在執行的任務數與其應該分得的計算資源之間的比值,選擇一個該比值最小的佇列;選擇了一個佇列之後,然後按照佇列中的作業優先順序和提交時間順序來選擇一個作業,同時需要考慮使用者組員量限制和記憶體限制。

4.FairScheduler(公平排程器)

發明於facebook,作用的物件與計算能力排程器一樣。公平排程器保證了小弟任務能夠快速的相應,同時也保證了最大任務的服務水平。

在hadoop,JobQueueTaskScheduler與LimitTasksPerJobTaskScheduler被載入在hadoop中,但是CapacityTaskScheduler與FairScheduler排程器作為hadoop的擴充套件模組,需要進行配置。