1. 程式人生 > >【Hadoop】 c++ && python 實現 Hadoop Streaming 的 partitioner 和 模組化

【Hadoop】 c++ && python 實現 Hadoop Streaming 的 partitioner 和 模組化

這些東西是我自己的理解,如果有錯誤的地方,或者有哪些地方走了彎路,請幫我指出我的錯誤,謝謝!
Hadoop Streaming 是一個工具, 代替編寫Java的實現類,而利用可執行程式來完成map-reduce過程.
工作流程:InputFile --> mappers --> [Partitioner] --> reducers --> outputFiles
理解: 
1 輸入檔案,可以是指定遠端檔案系統目錄(*代表所有檔案)
2 通過叢集自己分解到各個PC上,每個mapper是一個可執行檔案,相應的啟動一個程序,來實現你的邏輯
3 mapper的輸入為標準輸入,所以,任何能夠支援標準輸入的可執行的東西,c,c++(編譯出來的可執行檔案),python,......都可以作為mapper 和 reducer mapper的輸出為標準輸出,如果有Partitioner,就給它,如果沒有,它的輸出將作為reducer的輸入
4 Partitioner 為可選的項,二次排序,可以對結果進行分類打到結果檔案裡面,它的輸入是mapper的標準輸出,它的輸出,將作為reducer的標準輸入
5 reducer 同 mapper
6 輸出資料夾,在遠端檔案不能重名

Hadoop Streaming

hadoop-streaming.jar 的位置 : $HADOOP_HOME/contrib/streaming 內,官方上面關於hadoop-streaming 的介紹已經很詳細了,而且也有了關於python的例子,我就不說了,這裡總結下自己的經驗
1 指定 mapper or reducer 的 task 官方上說要用 -jobconf 但是這個引數已經過時,不可以用了,官方說要用 -D, 注意這個-D是要作為最開始的配置出現的,因為是在maper 和 reducer 執行之前,就需要硬性指定好的,所以要出現在引數的最前面 ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D .........-input ........ 類似這樣,這樣,即使你程式最後只指定了一個輸出管道,但是還是會有你指定的task數量的結果檔案,只不過多餘的就是空的,實驗以下就知道了.
2 關於二次排序,由於是用的streaming 所以,在可執行檔案內,只能夠處理邏輯,還有就是輸出,當然我們也可以指定二次排序,但是由於是全部引數化,不是很靈活。比如:
10.2.3.40    1
11.22.33.33    1
www.renren.com 1
www.baidu.com    1
10.2.3.40    1
這樣一個很規整的輸入檔案,需求是要把記錄獨立的ip和url的count 但是輸出檔案要分分割出來。
官方網站的例子,是指定 key 然後對key 指定 主-key 和 key 用來排序,而 主-key 用來二次排序,這樣會輸出你想要的東西, 但是對於上面最簡單的需求,對於傳遞引數,我們如何做呢?
其實我們還是可以利用這一點,在我們mapper 裡面,還是按照/t來分割key value 但是我們要給key指定一個主-key 用來給Partitioner 來實現二次排序,所以我們可以稍微處理下這個KEY,我們可以簡單的判斷出來ip 和 url 的區別,這樣,我們就人為的加上一個主-key 我們在mapper裡面,給每個key人為的加上一個"標籤",用來給partitioner做二次排序用,比如我們的mapper的輸出是這樣
D&10.2.3.40    1
D&11.22.33.33    1
W&www.renren.com 1
W&www.baidu.com    1
D&10.2.3.40    1
然後通過傳遞命令引數
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner //指定要求二次排序
-jobconf map.output.key.field.separator='&' //這裡如果不加兩個單引號的話我的命令會死掉
-jobconf num.key.fields.for.partition=1 //這裡指第一個 & 符號來分割,保證不會出錯
這樣我們就可以通過 partitioner 來實現二次排序了
在reducer裡面,我們再把"標籤"摘掉(不費吹灰之力)就可以做到悄無聲息的完成二次排序了。

關於模組化

(強調 : 沒有在叢集上測試,只在單機上做測試)

程式設計師最悲劇的就是不能程式碼複用,做這個也一樣,用hadoop-streaming 也一樣,要做到程式碼重用,是我第一個考慮的問題
當我看到 -file(詳細可以看官方網站上的講解) 的時候,我就想到利用這個東西,果然,我的在本機上建立了一個py模組,簡單的一個函式
然後在我的mapper裡面import 它,本地測試通過後,利用-file 把模組所在的問價夾用 -file moudle/* 這個引數,傳入streaming
執行的結果毫無錯誤,這樣,我們就可以抽象出來一些模組的東西,來實現我們模組化的需求

注 : 不要忘記 chmod +x *.py  將py 變成可執行的,不然不可以執行
程式碼 :  1: 模組程式碼 mg.py 用來給 mapper 貼標籤

def mgFunction(line):         if(line[0] >= '0' and line[0] <= '9'):                 return "D&" + line         return "W&" + line

2: mapper.py

#!/usr/bin/env python import sys sys.path.append('/home/liuguoqing/Desktop/hadoop-0.19.2/moudle') import mg for line in sys.stdin:         line = mg.mgFunction(line)         line = line.strip() #       print line         words = line.split()         print '%st%s' % (words[0], words[1])

3: reducer.py

#!/usr/bin/env python
import sys
user_login_day = {}
for line in sys.stdin:
        line = line[2:]//去掉帽子
        line = line.strip()
        userid, day = line.split('t', 1)
        user_login_day[userid] = user_login_day.get(userid, 0) + 1
for uid in user_login_day.keys():
        print '%st%d' % (uid, user_login_day[uid])

這樣就實現了模組化的可以二次排序的hadoop-streaming,命令如下

   1: ./bin/hadoop jar hadoop-0.19.2-streaming.jar 
   2: #streaming jar
   3: -D mapred.reduce.tasks=2  
   4: #指定2個reduce來處理
   5: -input user_login_day-input2/*  
   6: #指定輸入檔案 可以用 dir/* 方式
   7: -output user_login_day-output102 
   8: #指定輸出資料夾
   9: -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py  
  10: #指定mapper 可執行檔案 我用全路徑,好像用相對路徑會出錯...
  11: -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py 
  12: #指定reducer 可執行檔案 
  13: -file ~/Desktop/hadoop-0.19.2/moudle/* 
  14: #指定模組化的庫檔案 dir/* 模式
  15: -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 
  16: #指定 partitioner 引數為class
  17: -jobconf map.output.key.field.separator='&' 
  18: #指定 主-key 的分割符號為 '&'
  19: -jobconf num.key.fields.for.partition=1 

#指定為第一個‘&’
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop jar hadoop-0.19.2-streaming.jar -D mapred.reduce.tasks=2 -input user_login_day-input2/* -output user_login_day-output102 -mapper ~/Desktop/hadoop-0.19.2/python/mapper/get_user_login_day_back.py -reducer ~/Desktop/hadoop-0.19.2/python/reducer/get_user_login_day_back.py -file ~/Desktop/hadoop-0.19.2/moudle/* -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner -jobconf map.output.key.field.separator='&' -jobconf num.key.fields.for.partition=1
10/01/24 03:19:15 WARN streaming.StreamJob: -jobconf option is deprecated, please use -D instead.
packageJobJar: [/home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.py, /home/liuguoqing/Desktop/hadoop-0.19.2/moudle/mg.pyc, /tmp/hadoop-liuguoqing/hadoop-unjar6780057097425964518/] [] /tmp/streamjob3100401358387519950.jar tmpDir=null
10/01/24 03:19:15 INFO mapred.FileInputFormat: Total input paths to process : 2
10/01/24 03:19:15 INFO streaming.StreamJob: getLocalDirs(): [/tmp/hadoop-liuguoqing/mapred/local]
10/01/24 03:19:15 INFO streaming.StreamJob: Running job: job_201001221008_0065
10/01/24 03:19:15 INFO streaming.StreamJob: To kill this job, run:
10/01/24 03:19:15 INFO streaming.StreamJob: /home/liuguoqing/Desktop/hadoop-0.19.2/bin/../bin/hadoop job  -Dmapred.job.tracker=hdfs://localhost:9881 -kill job_201001221008_0065
10/01/24 03:19:15 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201001221008_0065
10/01/24 03:19:16 INFO streaming.StreamJob:  map 0%  reduce 0%
10/01/24 03:19:17 INFO streaming.StreamJob:  map 33%  reduce 0%
10/01/24 03:19:18 INFO streaming.StreamJob:  map 67%  reduce 0%
10/01/24 03:19:19 INFO streaming.StreamJob:  map 100%  reduce 0%
10/01/24 03:19:27 INFO streaming.StreamJob:  map 100%  reduce 50%
10/01/24 03:19:32 INFO streaming.StreamJob:  map 100%  reduce 100%
10/01/24 03:19:32 INFO streaming.StreamJob: Job complete: job_201001221008_0065
10/01/24 03:19:32 INFO streaming.StreamJob: Output: user_login_day-output102
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -ls user_login_day-output102
Found 3 items
drwxr-xr-x   - liuguoqing supergroup          0 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/_logs
-rw-r--r--   1 liuguoqing supergroup         25 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00000
-rw-r--r--   1 liuguoqing supergroup         47 2010-01-24 03:19 /user/liuguoqing/user_login_day-output102/part-00001
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00000
54321    2
99999    1
12345    12
liuguoqing@liuguoqing-desktop:~/Desktop/hadoop-0.19.2$ ./bin/hadoop dfs -cat user_login_day-output102/part-00001
http://www.renren.com    3
http://www.baidu.com    3
以上為操作結果顯示

4 : c++ 的應用

只要寫兩個個標準輸入輸出的mapper reducer,然後
g++ mapper.cpp -o mapper
g++ reducer.cpp -o reducer
生成的兩個可執行的 mapper reducer 的檔案作為mapper 和 reducer 引數就可以了,執行的命令和上面是一樣的. 程式碼如下

   1: mapper.cpp
   2:  
   3: #include <stdio.h>
   4: #include <string>
   5: #include <iostream>
   6: using namespace std;
   7: int main(){
   8:         string key;
   9:         string value;
  10:         while(cin>>key){
  11:                 cin>>value;
  12:                 cout<<key<<"t"<<value<<endl;
  13:         }
  14:         return 0;
  15: }
  16:  
  17: reducer.cpp
  18:  
  19: #include <stdio.h>
  20: #include <string>
  21: #include <map>
  22: #include <iostream>
  23: using namespace std;
  24: int main(){
  25:         string key;
  26:         string value;
  27:         map<string, int> word2count;
  28:         map<string, int> :: iterator it;
  29:         while(cin>>key){
  30:                 cin>>value;
  31:                 it = word2count.find(key);
  32:                 if(it != word2count.end()){
  33:                         ++it->second;
  34:                 }
  35:                 else{
  36:                         word2count.insert(make_pair(key, 1));
  37:                         it->second = 0;
  38:                 }
  39:         }
  40:         for(it = word2count.begin(); it != word2count.end(); ++it){
  41:                 cout<<it->first<<"t"<<it->second<<endl;
  42:         }
  43:         return 0;
  44: }
  45:  

這樣就可以利用c++來編寫 hadoop map-reduce了。