1. 程式人生 > >Python 玩轉大資料 Mapreduce開發 wordcount

Python 玩轉大資料 Mapreduce開發 wordcount

一 介紹

MapReduce 是一種分散式程式設計模型,用於處理大規模的資料。使用者主要通過指定一個 map 函式和一個 reduce 函式來處理一個基於key/value pair的資料集合,輸出中間的基於key/value pair的資料集合;然後 再建立一個Reduce函式用來合併所有的具有相同中間key值的中間value值。

使用python寫MapReduce的“訣竅”是利用Hadoop流的API,通過STDIN(標準輸入),STDOUT(標準輸出)在Map函式和Reduce函式之間傳遞資料。 我們唯一需要做的是利用Python的sys.stdin讀取輸入資料,並把我們的輸出傳送給sys.stdout。Hadoop流將會幫助我們處理別的任何事情。

二 mapreduce程式

1)使用者編寫的python程式分成2個部分:Mapper,Reducer;然後將程式提交到叢集上跑 (2)Mapper的輸入資料是KV對的形式(KV的型別可自定義) (3)Mapper的輸出資料是KV對的形式(KV的型別可自定義) (4)Mapper中的業務邏輯寫在map()方法中 (5)map()方法(maptask程序)對每一個<K,V>呼叫一次 (6)Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV (7)Reducer的業務邏輯寫在reduce()方法中 (8)Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法

# !/usr/bin/python
# 第一行一定要註明python執行位置,否則需要在執行python程式時,在程式前面加上python命令
# -*- coding: utf-8 -*-
# @Time    : 2018/10/25 下午11:42
# @Author  : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: map.py.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog    :https://blog.csdn.net/weixin_41734687




import
sys # maps words to their counts word2count = {} # input comes from STDIN (standard input) for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words while removing any empty strings words = filter(lambda word: word, line.split()) # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # # tab-delimited; the trivial word count is 1 print('%s\t%s' % (word, 1))
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/10/25 下午11:54
# @Author  : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: reduce.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog    :https://blog.csdn.net/weixin_41734687



from operator import itemgetter
import sys

# maps words to their counts
word2count = {}

# input comes from STDIN
for line in sys.stdin:
    # remove leading and trailing whitespace
    line = line.strip()

    # parse the input we got from mapper.py
    word, count = line.split()
    # convert count (currently a string) to int
    try:
        count = int(count)
        word2count[word] = word2count.get(word, 0) + count
    except ValueError:
        # count was not a number, so silently
        # ignore/discard this line
        pass

# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
# 實現按字典key對word進行排序,這樣輸出的結果是有序的
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))

# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
    print('%s\t%s' % (word, count))

本地測試

cat wordcount.csv | python map.py |sort -k 1|python reduce.py

把程式碼放到叢集上跑

# 提交叢集完整程式碼例項,下面用shell啟動
# /root/apps/hadoop-2.6.4/bin/hadoop jar /root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -mapper map.py -reducer  reduce.py -input /data/data_coe/data_asset/bigdata/*.csv -output /data/data_coe/data_asset/bigdata/output -file /root/Desktop/map.py -file /root/Desktop/reduce.py
HADOOP_CMD="/root/apps/hadoop-2.6.4/bin/hadoop"
    STREAM_JAR_PATH="/root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar"
    
    INPUT_FILE_PATH="/data/data_coe/data_asset/bigdata/*.csv"
    OUTPUT_PATH="/data/data_coe/data_asset/bigdata/output"
    
    hdfs dfs -rmr  $OUTPUT_PATH 
    
    $HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH \
        -output $OUTPUT_PATH \
        -mapper "python map.py" \
        -reducer "python reduce.py" \
        -file /root/Desktop/map.py \
        -file /root/Desktop/reduce.py

指令碼解釋

HADOOP_CMD: hadoop的bin的路徑
    STREAM_JAR_PATH:streaming jar包的路徑
    INPUT_FILE_PATH:hadoop叢集上的資源輸入路徑
    OUTPUT_PATH:hadoop叢集上的結果輸出路徑。(注意:這個目錄不應該存在的,因此在指令碼加了先刪除這個目錄。**注意****注意****注意**:若是第一次執行,沒有這個目錄,會報錯的。可以先手動新建一個新的output目錄。)
    $HADOOP_CMD fs -rmr  $OUTPUT_PATH
    
    $HADOOP_CMD jar $STREAM_JAR_PATH \
        -input $INPUT_FILE_PATH \
        -output $OUTPUT_PATH \
        # 若干map.py第一行指定了執行的python路徑,即第一行寫了!/usr/bin/python,可以以以下方式執行 -mapper map.py -reducer reduce.py
        -mapper "python map.py" \
        -reducer "python reduce.py" \
        -file ./map.py \
        -file ./reduce.py                 #這裡固定格式,指定輸入,輸出的路徑;指定mapper,reducer的檔案;並分發mapper,reducer角色的我們使用者寫的程式碼檔案,因為叢集其他的節點還沒有mapper、reducer的可執行檔案。