1. 程式人生 > >大資料處理神器map-reduce實現(僅python和shell版本)

大資料處理神器map-reduce實現(僅python和shell版本)

熟悉java的人直接可以使用java實現map-reduce過程,而像我這種不熟悉java的怎麼辦?為了讓非java程式設計師方便處理資料,我把使用python,shell實現streaming的過程,也即為map-reduce過程,整理如下:


1.如果資料不在hive裡面,而在hdfs中,可以使用下面的方式處理:

python版本的map-reduce

test.py

#!/bin/python
import sys 
from operator import itemgetter
from itertools import groupby


def mapper():
f = sys.stdin
    for line in f:
    line_list = line.strip().split("分隔符")
    #coding  這裡碼碼

把內容變成key-value的形式,方便reduec的時候按key聚合


def read_input(file):
    for line in file:
        yield line.strip().split("分隔符")
        
def reducer():
    data = read_input(sys.stdin)
    for key, kvalue in groupby(data, itemgetter(0)):
        temp_list = [ele[1] for ele in kvalue]

#coding  這裡碼碼實現想要的聚合效果



if __name__ == '__main__':
    flag = sys.argv[1]
    if flag == "map":
        mapper()
    elif flag == "reduce":
        reducer()
    else:

    print "error param"       


線下測試的時候使用命令:cat   filename | python test.py map | sort -k1 | python test.py reduce > text


shell版的map和python版的reduce一塊處理資料,重複利用兩種指令碼處理資料的優勢,當然這也是個人習慣而已

shell版的map

     test_map.sh

cat | awk ''  

使用cat  接收檔案,當然也可以不用,個人習慣,然後使用awk等出來資料,把處理的單行資料輸出

python的reduce

reduce_test.py

#!/bin/python
import sys 
from operator import itemgetter
from itertools import groupby

def read_input(file):
"""Read input and split."""
    for line in file:
        yield line.rstrip().split('\t')
        
def reducer():
    data = read_input(sys.stdin)
    for key, kviter in groupby(data, itemgetter(0)):
        temp_list = [ele[1] for ele in kviter]

#coding  這裡碼碼實現想要的聚合效果


if __name__ == '__main__':
    flag = sys.argv[1]
    if flag == "reduce":
        reducer()
    else:

print "error param"       


2.如果資料在hive裡面:

2.1這種情況也可以使用上面的方式處理

2.2可以寫hql使用transfrom..... using 的方式處理

例如:

add file  test.py; //上傳檔案到叢集節點

select transfrom(a,b,c)  using 'python test.py' as (a1,b1) from test_table where partition_name="123";

或者

select transfrom(a,b,c)  using 'cat | awk ....' as (a1,b1) from test_table where partition_name="123";


當然,實現這些功能已經完成了大部分工作,還有一些情況可能需要考慮優化技術,比如因為一些原因導致的資料不均衡,使reduce計算過程中出現某個或某幾個計算節點資料特別多,造成整體計算時間延長,更甚導致任務失敗。