1. 程式人生 > >Hadoop分散式環境下的資料抽樣

Hadoop分散式環境下的資料抽樣

1. 問題由來

Google曾經有一道非常經典的面試題:

給你一個長度為N的連結串列。N很大,但你不知道N有多大。你的任務是從這N個元素中隨機取出k個元素。你只能遍歷這個連結串列一次。你的演算法必須保證取出的元素恰好有k個,且它們是完全隨機的(出現概率均等)?

這道題的解法非常多,網上討論也非常熱烈。本文要討論的是,這個問題是從何而來,有什麼實用價值?

自從有了Hadoop之後,該問題便有了新的應用載體。隨著資料量的增多,很多資料探勘演算法被轉移到MapReduce上實現,而資料探勘中有個基本的問題是怎樣對資料進行抽樣。在Hadoop中,每個job會被分解成多個task平行計算,而資料的總量事先是不知道的(知道job執行結束才能獲取數總數,而資料量非常大時,掃描一遍資料的代價非常高),使用者知道的只是要獲取的樣本量,那怎樣在類似於Hadoop的分散式平臺上進行資料抽樣?

回過頭來看google的這道面試題,是不是正好時Hadoop平臺上海量資料抽樣問題?

2. 在Hadoop上編寫抽樣程式

2.1 解法一

(1) 設計思想

蓄水池抽樣:先儲存前k個元素, 從第k+1個元素開始, 以k/i (i=k+1, k+2,…,N) 的概率選中第i個元素,並隨機替換掉一個已儲存的記錄,這樣遍歷一次得到k個元素,可以保證完全隨機選取。

(2) MapReduce實現

要實現該抽樣演算法,只需編寫Mapper即可。在Map函式中,使用者定義一個vector儲存選中的k個元素,待掃描完所有元素後,在解構函式中將vector中的資料寫到磁碟中。

使用者執行job時,需指定每個map task的取樣量。比如,該job的map task個數為s,則每個map task需要採集k/s個元素。

(3) 優缺點分析

由於該job沒有reduce task,因而效率很高。然而,該方法選中某個元素的概率並不完全是k/N!

2.2 解法二

(1) 設計思想

依次掃描每個元素,為每個元素賦予一個隨機的整數值;然後使用Top K演算法(譬如最大K個整數)得到需要的K個元素。

(2) MapReduce實現

要實現該演算法,使用者需要編寫mapper和reducer,在map函式中,為每個元素賦予一個隨機數,將該隨機數作為key,並將key最大的前k個儲存下來(可使用小頂堆)在reduce函式中,唯一的reduce task輸出前k個元素。

(3) 優缺點分析

該演算法比第一種演算法低效,但由於整個過程自然流暢,實現起來非常簡單,不易出錯。

2.3 解法三

(1) 設計思想

考慮第一個元素,其以K/N的概率被選中;如果該節點被選中,則從剩下的(N-1)個元素中選出(K-1)個元素;如果沒有被選中,則從剩下的(N-1)個元素中選出K個元素,…,依次這樣下去,直到獲取K個元素。

(2) MapReduce實現

使用者只需編寫Mapper即可。首先要獲取每個map task輸入的資料量,這個可以在InputFormat中計算得到。然後,在每個map函式中,採集k/s(其中s為map task資料量)個元素。

(3) 優缺點分析

由於該演算法沒有reduce task,效率比較高,但需要在InputFormat中統計資料量,程式設計複雜度較高。

3. 延伸

這個問題與《程式設計珠璣》上討論的問題很相似:

輸入兩個整數m和n,其中m<n。輸出是0~n-1範圍內m個隨機整數的有序表,不允許重複。

對於該問題,大致存在四種演算法,他們有不同的優缺點。

(1) 第一種方法來自Knuth的《The art of Computer Programming, Volume 2: Seminumerical Algorithms》

虛擬碼是:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 select = m remaining = n for I = [0 n ) if(bigrand() % remaining) < select print i select— remaining—

只要m<=n,程式選出來的整數就恰為m個。

C++的實現如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 void genKnuth(int m, int n) { for(inti = 0; i < n; i++) { if(bigrand() % (n - i) < m) { cout <<i << endl; m--; } } }

該演算法非常節省空間,但需要全部掃描n個數,當n很多時,效率不高。

(2)第二種方法的複雜度只與m有關,採用了set(實際上是紅黑樹)節省時間。程式碼如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 void gensets(int m,intn) { set<int> S; while(S.size() < m) { S.insert(bigrand() % n); } // print S }

該方法每次插入均在O(log m)時間內完成,但需要的空間開銷很大。

(3)第三種方法克服了(2)的缺點,程式碼如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 void genshuf(int m,intn) { inti, j; int*x =new int[n]; for(i = 0; i < n; i++) { x[i] = i; } for(i = 0; i < m; i++) { j = randint(i, n-1); intt = x[i]; x[i] = x[j]; x[j] = x; } sort(x, x+m); //print result }

該演算法需要n個元素的記憶體空間和O(n+mlogm)的時間,其效能通常不吐Knuth的演算法。

(4)當m接近n時,基於集合的演算法生成的很多隨機數都要丟掉,因為之前的數已經存在於集合中了,為了改進這一點,演算法如下:

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 void genfloyd(int m, int n) { set<int> S; set<int>::iterator i; for(intj=n-m; j < n; j++) { intt = bigrand()%(j+1); if(S.find(t) == S.end()){ S.insert(t);// t not in S }else{ S.insert(j);// t in S } } //print results }

4. 參考資料

(1) 《程式設計珠璣》第二版