1. 程式人生 > >使用python寫一個最基本的mapreduce程序

使用python寫一個最基本的mapreduce程序

sheng words == reducer logs 註意 例子 one split

一個mapreduce程序大致分成三個部分,第一部分是mapper文件,第二個就是reducer文件,第三部分就是使用hadoop command 執行程序。

在這個過程中,困惑我最久的一個問題就是在hadoop command中hadoop-streaming 也就是streaming jar包的路徑。

路徑大概是這樣的:

cd ~
cd /usr/local/hadoop-2.7.3/share/hadoop/tools/lib
#在這個文件下,我們可以找到你 hadoop-streaming-2.7.3.jar

這個路徑是參考的這裏

這個最基本的mapreduce程序我主要參考了三個博客:

第一個-主要是參考這個博客的mapper和reducer的寫法-在這個博客中它在練習中給出了只寫mapper執行文件的一個例子

第二個博客-主要參考的這個博客的runsh的寫法

第三個博客-主要是參考這個博客的將本地文件上傳到hdfs文件系統中

首先對於mapper文件
mapper.py

#!/usr/bin/env python  
  
import sys  
  
# input comes from STDIN (standard input)  
for line in sys.stdin:  
    # remove leading and trailing whitespace  
    line = line.strip()  
    # split the line into words  
    words = line.split()  
    # increase counters  
    for word in words:  
        # write the results to STDOUT (standard output);  
        # what we output here will be the input for the  
        # Reduce step, i.e. the input for reducer.py  
        #  
        # tab-delimited; the trivial word count is 1  
        print '%s\t%s' % (word, 1)

#上面這個文件我們得到的結果大概是每個單詞對應一個數字1

對於reducer文件:reducer.py

#!/usr/bin/env python  
  
from operator import itemgetter  
import sys  
  
current_word = None  
current_count = 0  
word = None  
  
# input comes from STDIN  
for line in sys.stdin:  
    # remove leading and trailing whitespace  
    line = line.strip()  
  
    # parse the input we got from mapper.py  
    word, count = line.split('\t', 1)  
  
    # convert count (currently a string) to int  
    try:  
        count = int(count)  
    except ValueError:  
        # count was not a number, so silently  
        # ignore/discard this line  
        continue  
  
    # this IF-switch only works because Hadoop sorts map output  
    # by key (here: word) before it is passed to the reducer  
    if current_word == word:  
        current_count += count  
    else:  
        if current_word:  
            # write result to STDOUT  
            print '%s\t%s' % (current_word, current_count)  
        current_count = count  
        current_word = word  
  
# do not forget to output the last word if needed!  
if current_word == word:  
    print '%s\t%s' % (current_word, current_count)

對上面兩個代碼先進行一個本地的檢測

vim test.txt
foo foo quux labs foo bar quux
cat test.txt|python mapper.py

cat test.txt|python mapper.py|sort|python reducer.py
##註意在這裏我們執行萬mapper之後我們進行了一個排序,所以對於相同單詞是處於相鄰位置的,這樣在執行reducer文件的時候代碼可以寫的比較簡單一點

然後在hadoop集群中跑這個代碼

首先講這個test.txt 上傳到相應的hdfs文件系統中,使用的命令模式如下:

hadoop fs -put ./test.txt /dw_ext/weibo_bigdata_ugrowth/mds/

然後寫一個run.sh


HADOOP_CMD="/usr/local/hadoop-2.7.3/bin/hadoop"  # hadoop的bin的路徑
STREAM_JAR_PATH="/usr/local/hadoop-2.7.3/share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar"  ## streaming jar包的路徑

INPUT_FILE_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/src.txt" #hadoop集群上的資源輸入路徑
#需要註意的是intput文件必須是在hadooop集群上的hdfs文件中的,所以必須將本地文件上傳到集群上
OUTPUT_PATH="/dw_ext/weibo_bigdata_ugrowth/mds/output"
#需要註意的是這output文件必須是不存在的目錄,因為我已經執行過一次了,所以這裏我把這個目錄通過下面的代碼刪掉

$HADOOP_CMD fs -rmr  $OUTPUT_PATH

$HADOOP_CMD jar $STREAM_JAR_PATH     -input $INPUT_FILE_PATH     -output $OUTPUT_PATH     -mapper "python mapper.py"     -reducer "python reducer.py"     -file ./mapper.py     -file ./reducer.py

# -mapper:用戶自己寫的mapper程序,可以是可執行文件或者腳本
# -reducer:用戶自己寫的reducer程序,可以是可執行文件或者腳本
# -file:打包文件到提交的作業中,可以是mapper或者reducer要用的輸入文件,如配置文件,字典等。

明天看這個
https://www.cnblogs.com/shay-zhangjin/p/7714868.html
https://www.cnblogs.com/kaituorensheng/p/3826114.html

使用python寫一個最基本的mapreduce程序