在Hadoop上用Python實現WordCount
在hadoop上用Python實現WordCount
一、簡單說明
本例中我們用Python寫一個簡單的運行在Hadoop上的MapReduce程序,即WordCount(讀取文本文件並統計單詞的詞頻)。這裏我們將要輸入的單詞文本input.txt和Python腳本放到/home/data/python/WordCount目錄下。
cd /home/data/python/WordCount
vi input.txt
輸入:
There is no denying that
hello python
hello mapreduce
mapreduce is good
二、編寫Map代碼
這裏我們創建一個mapper.py腳本,從標準輸入(stdin)讀取數據,默認以空格分隔單詞,然後按行輸出單詞機器詞頻到標準輸出(stdout),整個Map處理過程不會統計每個單詞出現的總次數,而是直接輸出“word 1”,以便作為Reduce的輸入進行統計,確保該文件是可執行的(chmod +x /home/data/python//WordCount/mapper.py)。
cd /home/data/python//WordCount
vi mapper.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
#輸入為標準輸入stdin
for line in sys.stdin:
#刪除開頭和結尾的空格
line = line.strip()
#以默認空格分隔行單詞到words列表
words = line.split()
for word in words:
#輸出所有單詞,格式為“單詞,1”以便作為Reduce的輸入
print(‘%s\t%s‘ %(word,1))
#截圖如下:
三、編寫Reduce代碼
這裏我們創建一個reducer.py腳本,從標準輸入(stdin)讀取mapper.py的結果,然後統計每個單詞出現的總次數並輸出到標準輸出(stdout),
確保該文件是可執行的(chmod +x /home/data/python//WordCount/reducer.py)
cd /home/data/python//WordCount
vi reducer.py
#!/usr/bin/env python
# -*- coding:UTF-8 -*-
import sys
current_word = None
current_count = 0
word = None
#獲取標準輸入,即mapper.py的輸出
for line in sys.stdin:
#刪除開頭和結尾的空格
line = line.strip()
#解析mapper.py輸出作為程序的輸入,以tab作為分隔符
word,count = line.split(‘\t‘,1)
#轉換count從字符型為整型
try:
count = int(count)
except ValueError:
#count不是數據時,忽略此行
continue
#要求mapper.py的輸出做排序操作,以便對連接的word做判斷,hadoop會自動排序
if current_word == word:
current_count += count
else:
if current_word:
#輸出當前word統計結果到標準輸出
print(‘%s\t%s‘ %(current_word,current_count))
current_count = count
current_word = word
#輸出最後一個word統計
if current_word == word
print(‘%s\%s‘ %(current_word,current_count))
#截圖如下:
四、本地測試代碼
我們可以在Hadoop平臺運行之前在本地測試,校驗mapper.py與reducer.py運行的結果是否正確。註意:測試reducer.py時需要對mapper.py的輸出做排序(sort)操作,不過,Hadoop環境會自動實現排序。
#在本地運行mapper.py:
cd /home/data/python/WordCount/
#記得執行: chmod +x /home/data/python//WordCount/mapper.py
cat input.txt | ./mapper.py
#在本地運行reducer.py
#記得執行:chmod +x /home/data/python//WordCount/reducer.py
cat input.txt | ./mapper.py | sort -k1,1 | ./reducer.py
五、在Hadoop平臺上運行代碼
在hadoop運行代碼,前提是已經搭建好hadoop集群
1、創建目錄並上傳文件
首先在HDFS上創建文本文件存儲目錄,這裏我創建為:/WordCound
hdfs dfs -mkdir /WordCound
#將本地文件input.txt上傳到hdfs的/WordCount上。
hadoop fs -put /home/data/python/WordCount/input.txt /WordCount
hadoop fs -ls /WordCount #查看在hdfs中/data/WordCount目錄下的內容
2、執行MapReduce程序
為了簡化我們執行Hadoop MapReduce的命令,我們可以將Hadoop的hadoop-streaming-3.0.0.jar加入到系統環境變量/etc/profile中,在/etc/profile文件中添加如下配置:
首先在配置裏導入hadoop-streaming-3.0.0.jar
vi /etc/profile
HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-3.0.0.jar
export HADOOP_STREAM
source /etc/profile #刷新配置
#執行以下命令:
hadoop jar $HADOOP_STREAM -file /home/data/python/WordCount/mapper.py -mapper ./mapper.py -file /home/data/python/WordCount/reducer.py -reducer ./reducer.py -input /WordCount -output /output/word1
得到:
然後,輸入以下命令查看結果:
hadoop fs -ls /output/word1
hadoop fs -cat /output/word1/part-00000 #查看分析結果
可以發現,結果與之前測試的時候是一致的,那麽恭喜你,大功告成!
在Hadoop上用Python實現WordCount