1. 程式人生 > >10天Hadoop快速突擊(3)——開發MapReduce應用程式

10天Hadoop快速突擊(3)——開發MapReduce應用程式

開發MapReduce應用程式

一、系統引數的配置

1.通過API對相關元件的引數進行配置

這些API被分成了一下幾個部分:

org.apache.hadoop.conf:定義了系統引數的配置檔案處理API

org.apache.hadoop.fs:定義了抽象的檔案系統API

org.apache.hadoop.dfs:Hadoop分散式檔案系統(HDFS)模組的實現

org.apache.hadoop.mapred:Hadoop分散式計算系統(MapReduce)模組的實現,包括任務的分發排程

org.apache.hadoop.ipc:用在網路伺服器和客戶端的工具,封裝了網路非同步I/O的基礎模組

org.apache.hadoop.io:定義了通用I/O API,用於針對網路、資料庫、檔案等資料物件進行讀寫操作

Configuration類由源來設定,每個源包含以XML形式出現的一系列屬性/值對。每個源以一個字串或一個路徑來命名,如果是以字串命名,則通過類路徑檢查該字串代表的路徑是否存在;如果是以路徑命名的,則直接通過本地檔案系統進行檢查,而不用類物件。

2.多個配置檔案的整合

使用兩個資源configuration-default.xml和configuration-site.xml來定義配置。將資源按順序新增到Configuration之中。後新增進來的屬性取值覆蓋掉前面所新增資源中的屬性取值。但是,有一個特例,被標記為final的屬性不能被後面定義的屬性覆蓋。重寫標記為final的屬性通常會報告配置錯誤,同時會有警告資訊被記錄下來以便為診斷所用。

管理員將守護程序地址檔案之中的屬性標記為final,可防止使用者在客戶端配置檔案中或作業提交引數中改變其取值。

Hadoop預設使用兩個源進行配置,並按順序載入core-default.xml和core-site.xml。其中core-default.xml用於定義系統預設的屬性,core-site.xml用於定義在特定的地方重寫。

二、配置開發環境

首先下載準備使用的Hadoop版本,然後將其解壓到用於開發的主機上。接著,在IDE中建立一個新的專案,將解壓後的資料夾根目錄下的JAR檔案和lib目錄之下的JAR檔案加入到classpath中。之後就可以編譯Hadoop程式,並且可以在IDE中以本地模式執行。

Hadoop有三種不同的執行方式:單機模式、偽分佈模式、完全分佈模式。這裡使用偽分佈模式,它執行的檔案系統的HDFS,能夠模擬完全分佈模式,看到一些分散式處理的效果。

常用的方式是編寫和除錯程式在單機或偽分佈模式下進行,而實際處理大資料則完全分佈模式下進行。

三、編寫MapReduce程式

程式分為兩個部分:Map部分和Reduce部分,分別實現Map和Reduce的功能。

1.Map處理

Mapper處理的資料是由InputFormat分解過的資料集,其中InputFormat的作用是將資料集切割成小資料集InputSplit,每一個InputSplit將由一個Mapper負責處理。此外InputFormat中還提供了一個RecordReader的實現,並將一個InputSplit解析成<key,value>對提供給Map函式。InputFormat的預設值為TextInputFormat,它針對文字檔案,按行將文字檔案切割成InputSplit,並用LineRecordReader將InputSplit解析成<key,value>對,key是行在文字中的位置,value是檔案中的一行。

2.Reduce處理

Map處理的結果會通過partition分發到Reduce,Reduce做完Reduce操作後,將通過OutputFormat輸出結果。

Mapper最終處理的結果<key,value>對會被送到Reduce中進行合併,在合併的時候,有相同的key的鍵/值對會被送到同一個Reducer上。Reducer是所有使用者定製Reducer類的基類,它的輸入是key及這個key對應的所有value的一個迭代器,還有Reducer的上下文。Reduce處理的結果將通過Reducer.Context的write方法輸出到檔案中。

四、本地測試

如果程式要Eclipse中執行,則使用者需要在run configuration中設定好引數,輸入的資料夾名為input,輸出的資料夾名為output。

五、執行MapReduce程式

1.打包

為了能夠在命令列中執行程式,首先需要對它進行編譯和打包。

2.在本地模式下執行

使用下面的命令以本地模式執行打包後的程式:

hadoop jar ***.jar inputPath outputPath

3.在叢集上執行

首先,將輸入的檔案複製到HDFS中,

hadoop dfs -copyFromLocal /home/user/hadoop/inputFile inputFile

然後,執行程式

hadoop jar /home/user/hadoop/inputFile.jar inputFile input output

六、網路使用者介面

Hadoop自帶的網路使用者介面http://jobtracker-host:50030/

1.JobTracker頁面

JobTracker頁面主要包括五個部分:

第一部分是Hadoop安裝的詳細資訊,包括版本號、編譯完成時間、JobTracker當前的執行狀態和開始時間

第二部分是叢集的一個總結資訊:叢集容量(用叢集上可用的Map和Reduce任務槽的數量表示)及使用情況、叢集上執行的Map和Reduce的數量、提交的工作總量、當前可用的TaskTracker節點數和每個節點平均可用槽的數量。

第三部分是一個正在執行的工作日程表。開啟能看到工作的序列。

第四部分顯示的是正在執行、完成、失敗的工作,這些資訊通過表格來體現。表格中的每一行代表一個工作並顯示了工作的ID號、所屬者、名字和程序資訊。

第五部分是頁面的最下面JobTracker日誌的連線和JobTracker的歷史資訊:JobTracker執行的所有工作資訊。歷史記錄是永久儲存的。

2.工作頁面

點選一個工作的ID號,進入工作頁面,在工作頁面的頂部是一個關於工作的一些總結性基本資訊,如工作所屬者、名字、執行時間等。工作檔案是工作的加強配置檔案,包括在工作執行期間所有有效的屬性及它們的取值。

當工作執行時,可以在頁面上監控它的進展情況,因為頁面會週期性更新。

Reduce完成圖分為3個階段:複製(發生在Map輸出轉交給Reduce的TaskTracker時)、排序(發生在Reduce輸入合併時)和Reduce(發生在Reduce函式起作用併產生最終輸出時)。

3.返回結果

可以通過幾種方式得到結果:

1)通過命令列直接顯示輸出資料夾中的檔案。

2)將輸出的檔案從HDFS複製到本地檔案系統上,在本地檔案系統上檢視。

3)通過Web介面檢視輸出的結果。

4.任務頁面

工作頁面中的一些連結可以用來檢視該工作中任務的詳細資訊。

任務頁面顯示的資訊以表格形式來體現,表中的每一行都表示一個任務,它包含了諸如開始時間、結束時間之類的資訊,以及由TaskTracker提供的錯誤資訊和檢視單個任務的計數器的連結。

5.任務細節頁面

在任務頁面上可以點選任何任務來得到關於它的詳細資訊。

七、效能調優

效能調優具體來講包括兩個方面的內容:一個是時間效能,另一個是空間效能。衡量效能的指標就是,能夠在正確完成功能的基礎上,使執行的時間儘量短、佔用的空間儘量小。

1.輸入採用大檔案

HDFS塊的預設大小為64M。

如果不對小檔案做合併的預處理,也可借用Hadoop中的CombineFileInputFormat,它可以將多個檔案打包到一個輸入單元中,從而每次執行Map操作就會處理更多的資料。同時,ConbineFileInputFormat會考慮節點和叢集的位置資訊,以決定哪些檔案被打包到一個單元之中,所以使用CombineFileInputFormat也會使效能得到相應地提高。

2.壓縮檔案

在分散式系統中,不同節點的資料交換時影響整體效能的一個重要因素。另外在Hadoop的Map階段所處理的輸出大小也會影響整個MapReduce程式的執行時間。這是應為Map階段的輸出首先儲存在一定大小的記憶體緩衝區中,如果Map輸出的大小超出一定限度,Map task就會將結果寫入磁碟,等Map任務結束後再將它們複製到Reduce任務的節點上。如果資料量大,中間的資料交換會佔用很多的時間。

一個提高效能的方法是對Map的輸出進行壓縮。這樣的好處有:減少儲存檔案的空間、加快資料在網路上(不同節點間)的傳輸速度,減少資料在記憶體和磁碟間交換的時間。

可以通過將mapred.compress.map.output屬性設定為true來對Map的輸出資料進行壓縮,同時還可以設定Map輸出資料的壓縮格式,通過設定mapred.output.compression.codec屬性即可進行壓縮格式的設定。

3.過濾資料

資料過濾主要指在面對海量輸入資料作業時,在作業執行之間先將資料中無用資料、噪聲資料和異常資料清除。通過資料過濾可以降低資料處理的規模,較大程度低提高資料處理效率,同時避免異常資料或不規範資料對最終結果造成的負面影響。

如何進行資料過濾呢?在MapReduce中可以根據過濾條件利用很多辦法完成資料預處理中的資料過濾,如編寫預處理程式,在程式中新增過濾條件,形成真正的資料;也可以在資料處理任務的最開始程式碼處加上過濾條件;還可以使用特殊的過濾器資料結果來完成過濾。

Bloom Filter是一種二進位制向量資料結構。在儲存所有集合元素特徵的同時,它能在保證高效空間效率和一定出錯率的前提下迅速檢測一個元素是不是集合中的成員。而且Bloom Filter的誤報(false positive)只會發生在檢測集合內的資料上,而不會對集合外的資料產生漏報(false negative)。這樣每個檢測請求返回有“在集合內(可能錯誤)”和“不在集合內(絕對不在集合內)”兩種情況,可見Bloom Filter犧牲了極少正確率換取時間和空間,所以它不適合那些“零錯誤”的應用場合。在MapReduce中,Bloom Filter由Bloom Filter類(此類繼承了Filter類、Filter類實現了Writable序列化介面)實現,使用add(Key key)函式將一個key值加入Filter,使用membershipTest(Key key)來測試某個key是夠在Filter內。

Bloom Filter具體是如何實現的呢?如何保證空間和時間的高效性呢?如何用正確率換取時間和空間呢?(基於MapReduce中實現的Bloom Filter程式碼進行分析)Bloom Filter自始至終是一個M位的位陣列。有兩個重要介面,分別是add()和membershipTest(),add()負責儲存集合元素的特徵到位陣列,在儲存所有集合元素特徵之後可以使用membershipTest()來判斷某個值是否是集合中的元素。

在初始狀態下,Bloom Filter的所有位都被初始化為0。為了表示集合中的所有元素,Bloom Filter使用k個互相獨立的Hash函式,它們分別將集合中的每個元素對映到(1,2,...,M)這個範圍上,對映的位置作為此元素特徵值的一維,並將位陣列中此位置的值設定為1,最終得到的k個Hash函式值將形成集合元素的特徵值向量,同時此向量也被儲存在位陣列中。從獲取k個Hash函式值到修改對應位陣列值,這就是add介面所完成的任務。

利用add介面將所有集合元素的特徵值向量儲存到Bloom Filter之後,就可以使用此過濾去也就是membershipTest介面來判斷某個值是否是集合元素。在判斷時,首先還是計算待判斷值的特徵值向量,也就是k個Hash函式值,然後判斷特徵值向量每一維對應的位陣列位置上的值是否是1,如果全部是1,那麼membershipTest返回True,否則返回false,這就是判斷值是否存在於集合中的原理。

正是Hash函式衝突的可能性導致誤判的可能。由於Hash函式衝突,兩個值的特徵值向量也有可能衝突(k個Hash函式全部衝突)。如果兩個值只有一個集合元素,那麼該值的特徵值向量會儲存在位陣列中,從而在判斷另外一個非集合元素的值時,會發現該值的特徵值向量已經儲存在位陣列中,最終返回true,形成誤判。

有哪些因素影響錯誤率?Hash函式的個數和位陣列的大小影響了錯誤率。位陣列越大,特徵值向量衝突的可能性越小,錯誤率也小。在位陣列大小一定的情況下,Hash函式個數越多,形成的特徵值向量維數越多,衝突的可能性越小;但是維數越多,佔用的位陣列位置越多,又提高了衝突的可能性。所以在實際中,應根據實際需要和一定的估計來確定合適的陣列規模和雜湊函式規模。

在Bloom Filter中插入元素和查詢值都是O(1)的操作;同時它並不儲存元素而是採用位陣列儲存特徵值,並且每一位都可以重複利用。但是錯誤率限制了Bloom Filter的使用場景,只允許誤報的場景;同時由於一位多用,Bloom Filter並不支援刪除集合元素,在刪除某個元素時可能會同時刪除另外一個元素的部分特徵值。

4.修改作業屬性

屬性mapred.tasktracker.map.tasks.maximum的預設值是2,屬性mapred.tasktracker.reduce.tasks.maximum的預設值也是2,因此每個節點上實際處於執行狀態的Map和Reduce的任務數量最多為2,而較為理想的數值應在10~100之間。因此,可以在conf目錄下修改屬性mapred.tasktracker.map.tasks.maximum和mapred.tasktracker.reduce.tasks.maximum的取值,將它們設定為一個較大的值,使得每個節點上同時執行的Map和Reduce任務數增加,從而縮短執行的時間,提高整體的效能。

八、MapReduce工作流

1.複雜的Map和Reduce函式

Map和Reduce都繼承自MapReduce自己定義好的Mapper和Reducer基類,MapReduce框架根據使用者繼承Mapper和Reducer後的衍生類和類中覆蓋的核心函式來識別使用者定義的Map處理階段和Reducec處理階段,所以只有使用者繼承這些類並且事先其中的核心函式,提交到MapReduce框架上的作業才能按照使用者的意願被解析出來並執行。

1)setup函式

setup函式是在task啟動開始就呼叫的。setup函式在task啟動之後資料處理之前只調用一次,而覆蓋的Map函式或Reduce函式會針對輸入分片中的每個key呼叫一次。所以setup函式可以看作task上的一個全域性處理,而不像在Map函式或Reduce函式中,處理只對當前輸入分片中的正在處理資料產生作用。利用setup函式的特性,可以將Map或Reduce函式中的重複處理放置在setup函式中,可以將Map或Reduce函式處理過程中可能使用到的全域性變數進行初始化,或從作業資訊中獲取全域性變數,還可以監控task的啟動。

需要注意的是:呼叫setup函式只是對應task上的全域性操作,而不是整個作業的全域性操作。

2)cleanup函式

cleanup函式跟setup函式相似,不同之處在於cleanup函式是在task銷燬之前執行的,它的作用和setup也相似,區別僅在於它的啟動處於task銷燬之前。

3)run函式

此函式是Map類和Reduce類的自動方法:先呼叫setup函式,然後針對每個key呼叫一次Map函式或Reduce函式,最後銷燬task之前再呼叫cleanup函式。這個run函式將Map階段和Reduce階段的程式碼過程呈現出來。

2.MapReduce Job中全域性共享資料

在程式設計過程中全域性變數的使用時不可避免的,但是在MapReduce中直接使用程式碼級別的全域性變數是不現實的。這主要是應為繼承Mapper基類的Map階段類的執行和繼承Reduce基類的Reduce階段類的執行都是獨立的,並不像程式碼看起來的那樣會共享同一個Java虛擬機器的資源。

1)讀寫HDFS檔案

在MapReduce框架中,Map task和Reduce task都執行在Hadoop叢集的節點上,所以Map task和Reduce task、甚至不同的Job都可以通過讀寫HDFS中預定好的同一個檔案來實現全域性共享資料。需要注意的是,針對多個Map和Reduce的寫操作會產生衝突,覆蓋原有資料。

這種方法的優點是能夠實現讀寫、也比較直觀;而缺點是要共享一些很小的全域性資料也需要使用I/O,這將佔用系統資源、增加作業完成的資源消耗。

2)配置Job屬性

在配置MapReduce執行過程中,task可以讀取Job的屬性,可以在任務啟動之初利用Configuration類中的set(String name,String value)將一些簡單的全域性資料封裝到作業的配置屬性中,然後在task中再利用Configuration類中的get(String name)獲取配置到屬性中的全域性資料。這種方法的有優點是簡單,資源消耗小,缺點是對量比較大的共享資料顯得比較無力。

3)使用DistributedCache

DistributedCache是MapReduce為應用提供快取檔案的只讀工具,它可以快取檔案文字檔案、壓縮檔案和jar檔案等。在使用時,使用者可以在作業配置時使用本地或HDFS檔案的URL來將其設定成共享快取檔案。在作業啟動之後和task啟動之前,MapReduce框架會將可能需要的快取檔案複製到執行任務節點的本地。這種方法的優點是每個Job共享檔案只會在啟動之後複製一次,並且它適用於大量的共享資料;而缺點是它是隻讀的。

如何使用DistrubutedCache的示例:

  • 將要快取的檔案複製到HDFS上
  • 啟用作業的屬性配置,並設定待快取檔案
  • 在Map函式中使用DistrubutedCache

3.連結MapReduce Job

1).線性MapRecuce Job流

MapReduce Job也是一個程式,作為程式就是將輸入經過處理再輸出。所以在處理複雜問題的時候,如果一個Job不能完成,最簡單的辦法就是設定多個有一定順序的Job,每個Job以前有一個Job的輸出作為輸入,經過處理,將資料再輸出到下一個Job中。這樣Job流就能按照預定的程式碼處理資料,達到預期的目的。這種辦法的具體實現非常簡單:將每個Job的啟動程式碼設定成只有上一個Job結束之後才執行,然後將Job的輸入設定成上一個Job的輸出路徑。

2)複雜的MapReduce Job流

處理過程中資料流並不是簡單的線性流。這種情況下,MapReduce框架提供了讓使用者將Job組織成複雜Job流的API——ControlledJob類和JobControl類(這兩個類屬於org.apache.hadoop.mapreduce.lib.jobcontrl包)。具體做法:先按照正常情況配置各個Job,配置完成後再將各個Job封裝到對應的ControlledJob物件中,然後使用ControlledJob的addDependingJob()設定依賴關係,接著再例項化一個JobControl物件,並使用addJob()方法將所有的Job注入JobControl物件中,最後使用JobControl物件的run方法啟動Job流。

3)Job設定預處理和後處理過程

在Job處理前和處理後需要做一些簡單地處理,這種情況使用第一種方法仍能解決,但是如果針對這些簡單的處理設定新的Job類處理稍顯笨拙,這裡通過在Job前或後連結Map過程來解決預處理和後處理。具體是通過MapReduce中org.apache.hadoop.mapred.lib包下的ChainMapper和ChainReducer兩個靜態類來實現的,這種方法最終形成的是一個獨立的Job,而不是Job流,並且只有針對Job的輸入輸出流,各個階段函式之間的輸入輸出MapReduce框架會自動組織。

需要注意的是,ChainMapper和ChainReducer只支援舊的API,即Map和Reduce必須是實現org.apache.hadoop.mapred.Mapper介面的靜態類。

參考資料:

《Hadoop實戰》第二版