1. 程式人生 > >hadoop 兩表join處理方法

hadoop 兩表join處理方法

 1. 概述

在傳統資料庫(如:MYSQL)中,JOIN操作是非常常見且非常耗時的。而在HADOOP中進行JOIN操作,同樣常見且耗時,由於Hadoop的獨特設計思想,當進行JOIN操作時,有一些特殊的技巧。

本文首先介紹了Hadoop上通常的JOIN實現方法,然後給出了幾種針對不同輸入資料集的優化方法。

2. 常見的join方法介紹

假設要進行join的資料分別來自File1和File2.

2.1 reduce side join

reduce side join是一種最簡單的join方式,其主要思想如下:

在map階段,map函式同時讀取兩個檔案File1和File2,為了區分兩種來源的key/value資料對,對每條資料打一個標籤(tag),比如:tag=0表示來自檔案File1,tag=2表示來自檔案File2。即:map階段的主要任務是對不同檔案中的資料打標籤。

在reduce階段,reduce函式獲取key相同的來自File1和File2檔案的value list, 然後對於同一個key,對File1和File2中的資料進行join(笛卡爾乘積)。即:reduce階段進行實際的連線操作。

2.2 map side join

之所以存在reduce side join,是因為在map階段不能獲取所有需要的join欄位,即:同一個key對應的欄位可能位於不同map中。Reduce side join是非常低效的,因為shuffle階段要進行大量的資料傳輸。

Map side join是針對以下場景進行的優化:兩個待連線表中,有一個表非常大,而另一個表非常小,以至於小表可以直接存放到記憶體中。這樣,我們可以將小表複製多份,讓每個map task記憶體中存在一份(比如存放到hash table中),然後只掃描大表:對於大表中的每一條記錄key/value,在hash table中查詢是否有相同的key的記錄,如果有,則連線後輸出即可。

為了支援檔案的複製,Hadoop提供了一個類DistributedCache,使用該類的方法如下:

(1)使用者使用靜態方法DistributedCache.addCacheFile()指定要複製的檔案,它的引數是檔案的URI(如果是HDFS上的檔案,可以這樣:hdfs://jobtracker:50030/home/XXX/file)。JobTracker在作業啟動之前會獲取這個URI列表,並將相應的檔案拷貝到各個TaskTracker的本地磁碟上。(2)使用者使用DistributedCache.getLocalCacheFiles()方法獲取檔案目錄,並使用標準的檔案讀寫API讀取相應的檔案。

2.3 SemiJoin

SemiJoin,也叫半連線,是從分散式資料庫中借鑑過來的方法。它的產生動機是:對於reduce side join,跨機器的資料傳輸量非常大,這成了join操作的一個瓶頸,如果能夠在map端過濾掉不會參加join操作的資料,則可以大大節省網路IO。

實現方法很簡單:選取一個小表,假設是File1,將其參與join的key抽取出來,儲存到檔案File3中,File3檔案一般很小,可以放到記憶體中。在map階段,使用DistributedCache將File3複製到各個TaskTracker上,然後將File2中不在File3中的key對應的記錄過濾掉,剩下的reduce階段的工作與reduce side join相同。

2.4 reduce side join + BloomFilter

在某些情況下,SemiJoin抽取出來的小表的key集合在記憶體中仍然存放不下,這時候可以使用BloomFiler以節省空間。

BloomFilter最常見的作用是:判斷某個元素是否在一個集合裡面。它最重要的兩個方法是:add() 和contains()。最大的特點是不會存在false negative,即:如果contains()返回false,則該元素一定不在集合中,但會存在一定的true negative,即:如果contains()返回true,則該元素一定可能在集合中。

因而可將小表中的key儲存到BloomFilter中,在map階段過濾大表,可能有一些不在小表中的記錄沒有過濾掉(但是在小表中的記錄一定不會過濾掉),這沒關係,只不過增加了少量的網路IO而已。

3. 二次排序

在Hadoop中,預設情況下是按照key進行排序,如果要按照value進行排序怎麼辦?即:對於同一個key,reduce函式接收到的value list是按照value排序的。這種應用需求在join操作中很常見,比如,希望相同的key中,小表對應的value排在前面。

有兩種方法進行二次排序,分別為:buffer and in memory sort和 value-to-key conversion。

對於buffer and in memory sort,主要思想是:在reduce()函式中,將某個key對應的所有value儲存下來,然後進行排序。 這種方法最大的缺點是:可能會造成out of memory。

對於value-to-key conversion,主要思想是:將key和部分value拼接成一個組合key(實現WritableComparable介面或者呼叫setSortComparatorClass函式),這樣reduce獲取的結果便是先按key排序,後按value排序的結果,需要注意的是,使用者需要自己實現Paritioner,以便只按照key進行資料劃分。Hadoop顯式的支援二次排序,在Configuration類中有個setGroupingComparatorClass()方法,可用於設定排序group的key值,具體參考:

4. 後記

最近一直在找工作,由於簡歷上寫了熟悉Hadoop,所以幾乎每個面試官都會問一些Hadoop相關的東西,而 Hadoop上Join的實現就成了一道必問的問題,而極個別公司還會涉及到DistributedCache原理以及怎樣利用DistributedCache進行Join操作。為了更好地應對這些面試官,特整理此文章。

O(∩_∩)O哈哈~

5. 參考資料

(1) 書籍《Data-Intensive Text Processing with MapReduce》 page 60~67 Jimmy Lin and Chris Dyer,University of Maryland, College Park

(2) 書籍《Hadoop In Action》page 107~131