Python 玩轉大資料 Mapreduce開發 wordcount
阿新 • • 發佈:2018-12-17
一 介紹
MapReduce 是一種分散式程式設計模型,用於處理大規模的資料。使用者主要通過指定一個 map 函式和一個 reduce 函式來處理一個基於key/value pair的資料集合,輸出中間的基於key/value pair的資料集合;然後 再建立一個Reduce函式用來合併所有的具有相同中間key值的中間value值。
使用python寫MapReduce的“訣竅”是利用Hadoop流的API,通過STDIN(標準輸入),STDOUT(標準輸出)在Map函式和Reduce函式之間傳遞資料。 我們唯一需要做的是利用Python的sys.stdin讀取輸入資料,並把我們的輸出傳送給sys.stdout。Hadoop流將會幫助我們處理別的任何事情。
二 mapreduce程式
1)使用者編寫的python程式分成2個部分:Mapper,Reducer;然後將程式提交到叢集上跑 (2)Mapper的輸入資料是KV對的形式(KV的型別可自定義) (3)Mapper的輸出資料是KV對的形式(KV的型別可自定義) (4)Mapper中的業務邏輯寫在map()方法中 (5)map()方法(maptask程序)對每一個<K,V>呼叫一次 (6)Reducer的輸入資料型別對應Mapper的輸出資料型別,也是KV (7)Reducer的業務邏輯寫在reduce()方法中 (8)Reducetask程序對每一組相同k的<k,v>組呼叫一次reduce()方法
# !/usr/bin/python
# 第一行一定要註明python執行位置,否則需要在執行python程式時,在程式前面加上python命令
# -*- coding: utf-8 -*-
# @Time : 2018/10/25 下午11:42
# @Author : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: map.py.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog :https://blog.csdn.net/weixin_41734687
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN (standard input)
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# split the line into words while removing any empty strings
words = filter(lambda word: word, line.split())
# increase counters
for word in words:
# write the results to STDOUT (standard output);
# what we output here will be the input for the
# Reduce step, i.e. the input for reducer.py
#
# tab-delimited; the trivial word count is 1
print('%s\t%s' % (word, 1))
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Time : 2018/10/25 下午11:54
# @Author : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: reduce.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog :https://blog.csdn.net/weixin_41734687
from operator import itemgetter
import sys
# maps words to their counts
word2count = {}
# input comes from STDIN
for line in sys.stdin:
# remove leading and trailing whitespace
line = line.strip()
# parse the input we got from mapper.py
word, count = line.split()
# convert count (currently a string) to int
try:
count = int(count)
word2count[word] = word2count.get(word, 0) + count
except ValueError:
# count was not a number, so silently
# ignore/discard this line
pass
# sort the words lexigraphically;
#
# this step is NOT required, we just do it so that our
# final output will look more like the official Hadoop
# word count examples
# 實現按字典key對word進行排序,這樣輸出的結果是有序的
sorted_word2count = sorted(word2count.items(), key=itemgetter(0))
# write the results to STDOUT (standard output)
for word, count in sorted_word2count:
print('%s\t%s' % (word, count))
本地測試
cat wordcount.csv | python map.py |sort -k 1|python reduce.py
把程式碼放到叢集上跑
# 提交叢集完整程式碼例項,下面用shell啟動
# /root/apps/hadoop-2.6.4/bin/hadoop jar /root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar -mapper map.py -reducer reduce.py -input /data/data_coe/data_asset/bigdata/*.csv -output /data/data_coe/data_asset/bigdata/output -file /root/Desktop/map.py -file /root/Desktop/reduce.py
HADOOP_CMD="/root/apps/hadoop-2.6.4/bin/hadoop"
STREAM_JAR_PATH="/root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar"
INPUT_FILE_PATH="/data/data_coe/data_asset/bigdata/*.csv"
OUTPUT_PATH="/data/data_coe/data_asset/bigdata/output"
hdfs dfs -rmr $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python map.py" \
-reducer "python reduce.py" \
-file /root/Desktop/map.py \
-file /root/Desktop/reduce.py
指令碼解釋
HADOOP_CMD: hadoop的bin的路徑
STREAM_JAR_PATH:streaming jar包的路徑
INPUT_FILE_PATH:hadoop叢集上的資源輸入路徑
OUTPUT_PATH:hadoop叢集上的結果輸出路徑。(注意:這個目錄不應該存在的,因此在指令碼加了先刪除這個目錄。**注意****注意****注意**:若是第一次執行,沒有這個目錄,會報錯的。可以先手動新建一個新的output目錄。)
$HADOOP_CMD fs -rmr $OUTPUT_PATH
$HADOOP_CMD jar $STREAM_JAR_PATH \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
# 若干map.py第一行指定了執行的python路徑,即第一行寫了!/usr/bin/python,可以以以下方式執行 -mapper map.py -reducer reduce.py
-mapper "python map.py" \
-reducer "python reduce.py" \
-file ./map.py \
-file ./reduce.py #這裡固定格式,指定輸入,輸出的路徑;指定mapper,reducer的檔案;並分發mapper,reducer角色的我們使用者寫的程式碼檔案,因為叢集其他的節點還沒有mapper、reducer的可執行檔案。