1. 程式人生 > >使用Hadoop Streaming 完成MapReduce(Python程式碼)

使用Hadoop Streaming 完成MapReduce(Python程式碼)

一 Map和Reduce 

 

首先看下MR的工作原理

 

MapReduce的好處是它可以把在記憶體中不能完成的事轉變成可以在硬碟上高效完成。
Map-­‐Reduce 對於叢集的好處:
1,在多節點上冗餘地儲存資料,以保證資料的持續性和一直可取性
2, 將計算移向資料端,以最大程度減少資料移動
3,簡單的程式模型隱藏所有的複雜度

Map,Reduce一般的流程:
Map階段:
a, 逐個檔案逐行掃描
b, 掃描的同時抽取出我們感興趣的內容 (Keys)

Group by key
排序和洗牌
(Group by key階段會自動的執行,不需要自己寫)

Reduce階段:
a, 聚合 、 總結 、 過濾或轉換
b, 寫入結果

 

二  Hadoop Streaming原理

Hadoop 不僅可以使用Java進行MapReduce的編寫,也通過Hadoop Streaming的方式提供了其他語言編寫MR的介面。更重要的是,使用python來編寫MR,比使用親兒子Java編寫MR要更簡單和方便……所以在一些不非常複雜的任務中使用python來編寫MR比起使用Java,是更加划算的。

Hadoop streaming是Hadoop的一個工具, 它幫助使用者建立和執行一類特殊的map/reduce作業, 這些特殊的map/reduce作業是由一些可執行檔案或指令碼檔案充當mapper或者reducer。

比如可以使用python語言來寫map-reduce使用“Hadoop Streaming”來完成傳統mapreduce的功能。

$HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/hadoop-streaming.jar \
-input myInputDirs \
-output myOutputDir \
-mapper mapper.py \
-reducer reducer.py


上述程式碼通過引數input,output,mapper,reducer來定義輸入資料,輸出資料,mapper檔案,reducer檔案。

在上面的程式碼中,mapper和reducer都是可執行檔案,它們從標準輸入讀入資料(一行一行讀), 並把計算結果發給標準輸出。Streaming工具會建立一個Map/Reduce作業, 並把它傳送給合適的叢集,同時監視這個作業的整個執行過程。

如果一個可執行檔案被用於mapper,則在mapper初始化時, 每一個mapper任務會把這個可執行檔案作為一個單獨的程序啟動。 mapper任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,mapper收集可執行檔案程序標準輸出的內容,並把收到的每一行內容轉化成key/value對,作為mapper的輸出。 預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。 如果沒有tab,整行作為key值,value值為null。

如果一個可執行檔案被用於reducer,每個reducer任務會把這個可執行檔案作為一個單獨的程序啟動。 Reducer任務執行時,它把輸入切分成行並把每一行提供給可執行檔案程序的標準輸入。 同時,reducer收集可執行檔案程序標準輸出的內容,並把每一行內容轉化成key/value對,作為reducer的輸出。 預設情況下,一行中第一個tab之前的部分作為key,之後的(不包括tab)作為value。

 

三 詞頻統計的例子 
 

Python實現Wordcount:

1. mapper.py

[[email protected] wordcount]# vim mapper.py
寫入

#!/usr/bin/python
import sys
word2count = {}
for line in sys.stdin:
    line = line.strip()
    words = filter(lambda word:word,line.split())
    for word in words:
        print("%s\t%s" % (word,1))


2. reducer.py

[[email protected] wordcount]# vim reducer.py
寫入

#!/usr/bin/python
from operator import itemgetter
import sys
 
word2count = {}
for line in sys.stdin:
    line = line.strip()
    word,count = line.split()
    try:
        count = int(count)
        word2count[word] = word2count.get(word,0) + count
    except ValueError as err:
        print(err)
        pass
 
sorted_word2count = sorted(word2count.items(),key=itemgetter(0))
for word,count in sorted_word2count:
    print("%s\t%s" % (word, count))


3. 準備一個測試檔案test.txt

[[email protected] wordcount]# vim test.txt
寫入

this is a test
this is a test
this is a test
this is a test


4. 本地測試

[[email protected] wordcount]# cat test.txt |python mapper.py |sort|python reducer.py

a       4
is      4
test    4
this    4


[[email protected] wordcount]#
5. 叢集執行

叢集執行前要將本地的測試檔案上傳到hdfs
[[email protected] wordcount]# hadoop fs -mkdir /user/root/wordcount
[[email protected] wordcount]# hadoop fs -put test.txt /user/root/wordcount/
[[email protected] wordcount]# hadoop fs -ls /user/root/wordcount/
Found 1 items
-rw-r--r--   3 root root         60 2018-05-14 09:58 /user/root/wordcount/test.txt
[[email protected] wordcount]#

 


執行mapreduce

[[email protected] wordcount]# hadoop jar /opt/cloudera/parcels/CDH-5.13.1-1.cdh5.13.1.p0.2/jars/hadoop-streaming-2.6.0-cdh5.13.1.jar -D mapred.reduce.tasks=1 -mapper "python mapper.py" -reducer "python reducer.py" -file mapper.py -file reducer.py -input /user/root/wordcount/test.txt -output /user/root/wordcount/out

 

命令列檢視結果

[[email protected] wordcount]# hadoop fs -cat /user/root/wordcount/out/part-00000
a	4
is	4
test	4
this	4
[[email protected] wordcount]# 

四 使用第三方的Python庫

$HADOOP_HOME/bin/hadoop streaming -D mapred.job.priority='VERY_HIGH' -D mared.job.map.capacity=500 -D mapred.reduce.tasks=0 -D mapred.map.tasks=500 -input myInputDirs(你得HDFS路徑) -output myOutputDir(你的HDFS路徑) -mapper "python  yourpythonfile.py" -reducer "python  yourpythonfile.py" -file yourpythonfile.py(需要幾個就新增幾個-file) -cacheArchive "/xx/xx/xx/myvp.tar.gz#myvp"(此處是一個HDFS路徑,稍後用到)

 

使用第三方庫

需要使用第三方庫如bs4,numpy等時,需要用到虛擬環境virtualenv
virtualenv的使用

安裝

pip install virtualenv

新建虛擬環境

virtualenv myvp

使得虛擬環境的路徑為相對路徑

virtualenv --relocatable myvp

啟用虛擬環境

source myvp/bin/activate

如果想退出,可以使用下面的命令

deactivate

啟用後直接安裝各種需要的包

pip install XXX

壓縮環境包

tar -czf myvp.tar.gz myvp

在mapreduce上使用

在上面的指令碼中可以看到使用了-catchArchive,但是路徑是HDFS的路徑,因此需要提前將本地的myvp.tai.gz包上傳到HDFS上。
同時#後面的myvp是檔案的資料夾,解壓後還有一個myvp(因為壓縮的時候把資料夾本身也壓縮排去了),所有map中使用的時候的路徑就是myvp/myvp/bin/…
在map的python指令碼中加入如下的程式碼,會把第三方庫加入到python 路徑

import sys
sys.path.append("myvp/myvp/lib/python2.7")

 

 

 

 

 

參考:

https://blog.csdn.net/wawa8899/article/details/80305720

https://blog.csdn.net/wh357589873/article/details/70049088