大資料處理神器map-reduce實現(僅python和shell版本)
熟悉java的人直接可以使用java實現map-reduce過程,而像我這種不熟悉java的怎麼辦?為了讓非java程式設計師方便處理資料,我把使用python,shell實現streaming的過程,也即為map-reduce過程,整理如下:
1.如果資料不在hive裡面,而在hdfs中,可以使用下面的方式處理:
python版本的map-reduce
test.py
#!/bin/python
import sys
from operator import itemgetter
from itertools import groupby
def mapper():
f = sys.stdin
for line in f:
line_list = line.strip().split("分隔符")
#coding 這裡碼碼
def read_input(file):
for line in file:
yield line.strip().split("分隔符")
def reducer():
data = read_input(sys.stdin)
for key, kvalue in groupby(data, itemgetter(0)):
temp_list = [ele[1] for ele in kvalue]
#coding 這裡碼碼實現想要的聚合效果
if __name__ == '__main__':
flag = sys.argv[1]
if flag == "map":
mapper()
elif flag == "reduce":
reducer()
else:
print "error param"
線下測試的時候使用命令:cat filename | python test.py map | sort -k1 | python test.py reduce > text
shell版的map和python版的reduce一塊處理資料,重複利用兩種指令碼處理資料的優勢,當然這也是個人習慣而已
shell版的map
test_map.sh
cat | awk ''
使用cat 接收檔案,當然也可以不用,個人習慣,然後使用awk等出來資料,把處理的單行資料輸出
python的reduce
reduce_test.py
#!/bin/python
import sys
from operator import itemgetter
from itertools import groupby
def read_input(file):
"""Read input and split."""
for line in file:
yield line.rstrip().split('\t')
def reducer():
data = read_input(sys.stdin)
for key, kviter in groupby(data, itemgetter(0)):
temp_list = [ele[1] for ele in kviter]
#coding 這裡碼碼實現想要的聚合效果
if __name__ == '__main__':
flag = sys.argv[1]
if flag == "reduce":
reducer()
else:
print "error param"
2.如果資料在hive裡面:
2.1這種情況也可以使用上面的方式處理
2.2可以寫hql使用transfrom..... using 的方式處理
例如:
add file test.py; //上傳檔案到叢集節點
select transfrom(a,b,c) using 'python test.py' as (a1,b1) from test_table where partition_name="123";
或者
select transfrom(a,b,c) using 'cat | awk ....' as (a1,b1) from test_table where partition_name="123";
當然,實現這些功能已經完成了大部分工作,還有一些情況可能需要考慮優化技術,比如因為一些原因導致的資料不均衡,使reduce計算過程中出現某個或某幾個計算節點資料特別多,造成整體計算時間延長,更甚導致任務失敗。