1. 程式人生 > >Python之——使用原生Python編寫Hadoop MapReduce程式(基於Hadoop 2.5.2)

Python之——使用原生Python編寫Hadoop MapReduce程式(基於Hadoop 2.5.2)

一、簡單說明

本例中我們實現一個統計文字檔案中所有單詞出現的詞頻功能,這裡我們使用原生的Python來編寫MapReduce。同時,本例中我們將要輸入的單詞文字input.txt和Python指令碼放到/usr/local/python/source目錄下。文字內容如下:

hello hello liuyazhuang lyz liuyazhuang lyz where is your home home see you by test welcome test adc abc labs me python hadoop ab bc bec python hadoop bar ccc bar ccc bbb aaa bbb iii ooo xxx yyy xxyy xxx iii ooo yyy

二、安裝Zookeeper叢集

三、安裝Hadoop

1、偽分散式安裝

2、 叢集安裝

3、 高可用叢集安裝

這篇博文中,我也是在單節點上安裝的Hadoop,將HBase和Hadoop安裝在了同一臺伺服器上。由於HBase的執行依賴於Zookeeper,所以,在同一臺伺服器上,又安裝了單節點的Zookeeper。

四、安裝Storm叢集

五、編寫Map程式碼

這裡我們建立一個mapper.py指令碼,從標準輸入(stdin)讀取資料,預設以空格分隔單詞,然後按行輸出單詞機器詞頻到標準輸出(stdout),整個Map處理過程不會統計每個單詞出現的總次數,而是直接輸出“word 1”,以便作為Reduce的輸入進行統計,要求mapper.py具備可執行許可權,執行chmod +x /usr/local/python/source/mapper.py。

【/usr/local/python/source/mapper.py】

#!/usr/bin/env python
# -*- coding:UTF-8 -*-
'''
Created on 2018年1月14日

@author: liuyazhuang
'''
import sys
#輸入為標準輸入stdin
for line in sys.stdin:
    #刪除開頭和結尾的空格
    line = line.strip()
    #以預設空格分隔行單詞到words列表
    words = line.split()
    for word in words:
        #輸出所有單詞,格式為“單詞,1”以便作為Reduce的輸入
        print '%s\t%s' % (word, 1)

六、編寫Reduce程式碼

這裡我們建立一個reducer.py指令碼,從標準輸入(stdin)讀取mapper.py的結果,然後統計每個單詞出現的總次數並輸出到標準輸出(stdout),要求reducer.py具備可執行執行,執行chmod +x /usr/local/python/source/reducer.py

【/usr/local/python/source/reducer.py】

#!/usr/bin/env python
# -*- coding:UTF-8 -*-
'''
Created on 2018年1月14日

@author: liuyazhuang
'''

#from operator import itemgetter
import sys

current_word = None
current_count = 0
word = None

#獲取標準輸入,即mapper.py的輸出
for line in sys.stdin:
    #刪除開頭和結尾的空格
    line = line.strip()
    
    #解析mapper.py輸出作為程式的輸入,以tab作為分隔符
    word, count = line.split('\t', 1)
    
    #轉換count從字元型成整型
    try:
        count = int(count)
    except ValueError:
        #count不是資料時,忽略此行
        continue
    
    #要求mapper.py的輸出做排序操作,以便對連續的word做判斷,hadoop會自動排序
    if current_word  == word:
        current_count += count
    else:
        if current_word:
            #輸出當前word統計結果到標準輸出
            print '%s\t%s' % (current_word, current_count)
        current_count = count
        current_word = word
        
#輸出最後一個word統計
if current_word == word:
    print '%s\t%s' % (current_word, current_count)           

七、測試程式碼

我們可以在Hadoop平臺執行之前在本地測試,校驗mapper.py與reducer.py執行的結果是否正確。

注意:測試reducer.py時需要對mapper.py的輸出做排序(sort)操作,不過,Hadoop環境會自動實現排序

1、本地執行mapper.py

[[email protected] source]# cat input.txt | ./mapper.py 
hello   1
hello   1
liuyazhuang     1
lyz     1
liuyazhuang     1
lyz     1
where   1
is      1
your    1
home    1
home    1
see     1
you     1
by      1
  st    1
welcome 1
test    1
adc     1
abc     1
labs    1
me      1
python  1
hadoop  1
ab      1
bc      1
bec     1
python  1
hadoop  1
bar     1
ccc     1
bar     1
ccc     1
bbb     1
aaa     1
bbb     1
iii     1
ooo     1
xxx     1
yyy     1
xxyy    1
xxx     1
iii     1
ooo     1
yyy     1
輸出了Map的結果

2、本地執行reducer.py

[[email protected] source]# cat input.txt  | ./mapper.py | sort -k1,1 | ./reducer.py 
aaa     1
ab      1
abc     1
adc     1
bar     2
bbb     2
bc      1
bec     1
by      1
ccc     2
hadoop  2
hello   2
home    2
iii     2
is      1
labs    1
liuyazhuang     2
lyz     2
me      1
ooo     2
python  2
see     1
test    2
welcome 1
where   1
xxx     2
xxyy    1
you     1
your    1
yyy     2
輸出了Reduce的結果。

八、在Hadoop平臺執行程式碼

1、建立目錄並上傳檔案

首先在HDFS上建立文字檔案儲存目錄,本例項為/user/root/word,執行如下命令:

hdfs dfs -mkdir /user/root/word
上傳文字檔案到HDFS,本例項中為/usr/local/python/source/input.txt,如果有多個檔案,可採用以下方法進行操作,Hadoop分析目標預設針對目錄,目錄下的檔案都在運算範圍中。
[[email protected] source]# hadoop fs -put /usr/local/python/source/input.txt /user/root/word/
[[email protected] source]# hadoop fs -ls /user/root/word/           
Found 1 items
-rw-r--r--   1 root supergroup        215 2018-01-14 09:59 /user/root/word/input.txt

2、執行MapReduce程式

這裡,我們輸出結果檔案制定/output/word,執行以下命令:

[[email protected] source]# hadoop jar /usr/local/hadoop-2.5.2/share/hadoop/tools/lib/hadoop-streaming-2.5.2.jar -file ./mapper.py -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py -input /user/root/word -output /output/word
可以看到map及reducer的百分比,打印出的log如下:
18/01/14 10:54:19 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py, /usr/local/hadoop-2.5.2/tmp/hadoop-unjar3958497380381943575/] [] /tmp/streamjob1400075475828443108.jar tmpDir=null
18/01/14 10:54:22 INFO client.RMProxy: Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032
18/01/14 10:54:22 INFO client.RMProxy: Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032
18/01/14 10:54:24 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/14 10:54:25 INFO mapreduce.JobSubmitter: number of splits:2
18/01/14 10:54:25 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1515893542122_0001
18/01/14 10:54:26 INFO impl.YarnClientImpl: Submitted application application_1515893542122_0001
18/01/14 10:54:26 INFO mapreduce.Job: The url to track the job: http://liuyazhuang121:8088/proxy/application_1515893542122_0001/
18/01/14 10:54:26 INFO mapreduce.Job: Running job: job_1515893542122_0001
18/01/14 10:54:43 INFO mapreduce.Job: Job job_1515893542122_0001 running in uber mode : false
18/01/14 10:54:43 INFO mapreduce.Job:  map 0% reduce 0%
18/01/14 10:55:16 INFO mapreduce.Job:  map 33% reduce 0%
18/01/14 10:55:17 INFO mapreduce.Job:  map 100% reduce 0%
18/01/14 10:55:31 INFO mapreduce.Job:  map 100% reduce 100%
18/01/14 10:55:32 INFO mapreduce.Job: Job job_1515893542122_0001 completed successfully
18/01/14 10:55:32 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=398
                FILE: Number of bytes written=302280
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=529
                HDFS: Number of bytes written=202
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=62800
                Total time spent by all reduces in occupied slots (ms)=11416
                Total time spent by all map tasks (ms)=62800
                Total time spent by all reduce tasks (ms)=11416
                Total vcore-seconds taken by all map tasks=62800
                Total vcore-seconds taken by all reduce tasks=11416
                Total megabyte-seconds taken by all map tasks=64307200
                Total megabyte-seconds taken by all reduce tasks=11689984
        Map-Reduce Framework
                Map input records=1
                Map output records=44
                Map output bytes=304
                Map output materialized bytes=404
                Input split bytes=206
                Combine input records=0
                Combine output records=0
                Reduce input groups=30
                Reduce shuffle bytes=404
                Reduce input records=44
                Reduce output records=30
                Spilled Records=88
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=159
                CPU time spent (ms)=3040
                Physical memory (bytes) snapshot=571060224
                Virtual memory (bytes) snapshot=2657177600
                Total committed heap usage (bytes)=378011648
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=323
        File Output Format Counters 
                Bytes Written=202
18/01/14 10:55:32 INFO streaming.StreamJob: Output directory: /output/word
這裡,我們輸入如下命令檢視結果:
[[email protected] source]# hadoop fs -ls /output/word
Found 2 items
-rw-r--r--   1 root supergroup          0 2018-01-14 10:55 /output/word/_SUCCESS
-rw-r--r--   1 root supergroup        202 2018-01-14 10:55 /output/word/part-00000
[[email protected] source]# 
其中,part-00000存放了我們的分析結果,下面我們檢視結果:
[[email protected] source]# hadoop fs -cat /output/word/part-00000
aaa     1
ab      1
abc     1
adc     1
bar     2
bbb     2
bc      1
bec     1
by      1
ccc     2
hadoop  2
hello   2
home    2
iii     2
is      1
labs    1
liuyazhuang     2
lyz     2
me      1
ooo     2
python  2
see     1
test    2
welcome 1
where   1
xxx     2
xxyy    1
you     1
your    1
yyy     2
可見,結果與我們在測試的時候結果一致。

為了簡化我們執行Hadoop MapReduce的命令,我們可以將Hadoop的hadoop-streaming-*.jar加入到系統環境變數/etc/profile中,在/etc/profile檔案中新增如下配置:

HADOOP_STREAM=$HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-*.jar
export HADOOP_STREAM
這裡我們之前就配置了Hadoop的環境變數。

此時,我們執行以下命令來執行MapReduce程式

[[email protected] source]# hadoop jar $HADOOP_STREAM -file ./mapper.py -mapper ./mapper.py -file ./reducer.py -reducer ./reducer.py -input /user/root/word -output /output/word1
我們同樣可以看到Map和Reduce執行的百分比,執行的log日誌如下:
18/01/14 11:04:46 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [./mapper.py, ./reducer.py, /usr/local/hadoop-2.5.2/tmp/hadoop-unjar2463144927504143769/] [] /tmp/streamjob3106204875058057023.jar tmpDir=null
18/01/14 11:04:47 INFO client.RMProxy: Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032
18/01/14 11:04:48 INFO client.RMProxy: Connecting to ResourceManager at liuyazhuang121/192.168.209.121:8032
18/01/14 11:04:48 INFO mapred.FileInputFormat: Total input paths to process : 1
18/01/14 11:04:48 INFO mapreduce.JobSubmitter: number of splits:2
18/01/14 11:04:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1515893542122_0002
18/01/14 11:04:49 INFO impl.YarnClientImpl: Submitted application application_1515893542122_0002
18/01/14 11:04:49 INFO mapreduce.Job: The url to track the job: http://liuyazhuang121:8088/proxy/application_1515893542122_0002/
18/01/14 11:04:49 INFO mapreduce.Job: Running job: job_1515893542122_0002
18/01/14 11:04:55 INFO mapreduce.Job: Job job_1515893542122_0002 running in uber mode : false
18/01/14 11:04:55 INFO mapreduce.Job:  map 0% reduce 0%
18/01/14 11:05:05 INFO mapreduce.Job:  map 100% reduce 0%
18/01/14 11:05:19 INFO mapreduce.Job:  map 100% reduce 100%
18/01/14 11:05:19 INFO mapreduce.Job: Job job_1515893542122_0002 completed successfully
18/01/14 11:05:20 INFO mapreduce.Job: Counters: 49
        File System Counters
                FILE: Number of bytes read=398
                FILE: Number of bytes written=302283
                FILE: Number of read operations=0
                FILE: Number of large read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=529
                HDFS: Number of bytes written=202
                HDFS: Number of read operations=9
                HDFS: Number of large read operations=0
                HDFS: Number of write operations=2
        Job Counters 
                Launched map tasks=2
                Launched reduce tasks=1
                Data-local map tasks=2
                Total time spent by all maps in occupied slots (ms)=15700
                Total time spent by all reduces in occupied slots (ms)=10749
                Total time spent by all map tasks (ms)=15700
                Total time spent by all reduce tasks (ms)=10749
                Total vcore-seconds taken by all map tasks=15700
                Total vcore-seconds taken by all reduce tasks=10749
                Total megabyte-seconds taken by all map tasks=16076800
                Total megabyte-seconds taken by all reduce tasks=11006976
        Map-Reduce Framework
                Map input records=1
                Map output records=44
                Map output bytes=304
                Map output materialized bytes=404
                Input split bytes=206
                Combine input records=0
                Combine output records=0
                Reduce input groups=30
                Reduce shuffle bytes=404
                Reduce input records=44
                Reduce output records=30
                Spilled Records=88
                Shuffled Maps =2
                Failed Shuffles=0
                Merged Map outputs=2
                GC time elapsed (ms)=167
                CPU time spent (ms)=3260
                Physical memory (bytes) snapshot=598515712
                Virtual memory (bytes) snapshot=2668818432
                Total committed heap usage (bytes)=429916160
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
        File Input Format Counters 
                Bytes Read=323
        File Output Format Counters 
                Bytes Written=202
18/01/14 11:05:20 INFO streaming.StreamJob: Output directory: /output/word1
此時,我們檢視結果,也是和之前一樣的。

相關推薦

Python——使用原生Python編寫Hadoop MapReduce程式(基於Hadoop 2.5.2)

一、簡單說明 本例中我們實現一個統計文字檔案中所有單詞出現的詞頻功能,這裡我們使用原生的Python來編寫MapReduce。同時,本例中我們將要輸入的單詞文字input.txt和Python指令碼放到/usr/local/python/source目錄下。文字內容如下:

Python——病毒檢測模組pyClamad的安裝(基於CentOS 6.5系統)

一、ClamAV與pyClamad概述 Clam AntiVirus(ClamAV)是一款免費而且開放原始碼的防毒軟體,軟體與病毒庫的更新皆由社群變肥釋出,官網地址為:http://www.clamav.net/。 目前ClamAV主要為Linux、Unix系統提供病毒掃

Python編寫Hadoop MapReduce程式

adoop 的 MapReduce 程式,使用的是 Java ,但是使用 Java 很明顯的一個弊端就是每次都要編碼、打包、上傳、執行,還真心是麻煩,想要更加簡單的使用 Hadoop 的運算能力,想要寫 MapReduce程式不那麼複雜。還真是個問題。 仔細考慮了下,p

Python寫一個 Hadoop MapReduce 程式

01 [email protected]:/usr/local/hadoop$ bin/hadoop jar contrib/streaming/hadoop-*streaming*.jar -mapper /home/hduser/mapper.py -reducer /home/hduser

擴充套件PythonPython中呼叫C編寫的函式模組

目錄 編寫Python擴充套件 1. 建立應用程式碼 2. 根據樣板編寫封裝程式碼 2.1 包含Python標頭檔案 2.2 為每一個模組函式新增形如PyObject* *Module_func()* 的封裝函式

利用Python的SocketServer框架編寫網路服務程式

1.前言:        雖說用Python編寫簡單的網路程式很方便,但複雜一點的網路程式還是用現成的框架比較好。這樣就可以專心事務邏輯,而不是套接字的各種細節。SocketServer模組簡化了編寫網路服務程式的任務。同時SocketServer模組也是Python標準庫

Python 簡化引用socket編寫TCP服務程式

''' Python 簡化引用socket編寫TCP服務程式 by 鄭瑞國 1、引用網路模組socket 2、建立一個TCP網路套接字s 3、繫結本機IP地址和指定埠號 4、開始監聽 5、接受連線 6、接受資訊 ''' from socket import *

Python——自動上傳本地log檔案到HDFS(基於Hadoop 2.5.2)

一、場景描述 比如我們的網站共有5臺Web裝置,日誌檔案存放在/data/logs/日期(20180114)/access.log。日誌為預設的Nginx定義格式,如下所示: 10.2.2.234 - - [12/Jan/2018:08:36:23 +0800] "PO

Python埠掃描器編寫

其實,寫個掃描器也挺好玩的,牽涉到了RAW Socket程式設計,可以盡情地DIY資料包(當然,不符合資料包規則,比如checksum錯誤就沒辦法了),收穫頗深。其中,我覺得用C語言寫更有利於在編寫過程中對加深對計算機網路的理解,特別是資料包細節。但是由於效率問題,還有P

Python旅-Python基礎4-數據類型

都是 指定 hello double 裏的 移除 空間 class 字符數組 1. 數字 2是一個整數的例子。長整數不過是大一些的整數,3.23和52.3E-4是浮點數的例子。E標記表示10的冪。在這裏,52.3E-4表示52.3 * 10-4. (-5+4j)和(2.3-

Python路--Python基礎9--Socket編程

主機 make 深入理解 pre odi data splay 異常 inpu 一、socket介紹   Socket是應用層與TCP/IP協議族通信的中間軟件抽象層,它是一組接口。在設計模式中,Socket其實就是一個門面模式,它把復雜的TCP/IP協議族隱藏在Sock

Python路--Python基礎

圖片 數據 png red display socket 並發編程 memcached 異步 目錄:   第一篇:初識Python   第二篇:數據類型     第三篇:數據運算、控制流、文件操作   第四篇:函數   第五篇:內置函數   第六篇:模

深入理解python二——python列表和元組

n) 數據 兩種 性能 執行 效率 動態 單元 這一 從一開始學習python的時候,很多人就聽到的是元組和列表差不多,區別就是元組不可以改變,列表可以改變。 從數據結構來說,這兩者都應當屬於數組,元組屬於靜態的數組,而列表屬於動態數組。稍後再內存的分配上也會體現這一點。對

PythonPython內建函式、zip()、max()、min() PythonPython內建函式、zip()、max()、min()

Python之路Python內建函式、zip()、max()、min() 一、python內建函式 abs() 求絕對值 例子 print(abs(-2)) all() 把序列中每一個元素做布林運算,如果全部都是true,就返回true,

PythonPython作用域、匿名函式、函數語言程式設計、map函式、filter函式、reduce函式 PythonPython作用域、匿名函式、函數語言程式設計、map函式、filter函式、reduce函式

Python之路Python作用域、匿名函式、函數語言程式設計、map函式、filter函式、reduce函式 一、作用域 return 可以返回任意值例子 def test1(): print("test1") def test(): print("te

PythonPython全域性變數與區域性變數、函式多層巢狀、函式遞迴 PythonPython全域性變數與區域性變數、函式多層巢狀、函式遞迴

Python之路Python全域性變數與區域性變數、函式多層巢狀、函式遞迴 一、區域性變數與全域性變數   1、在子程式中定義的變數稱為區域性變數,在程式的一開始定義的變數稱為全域性變數。全域性變數作用域是整個程式,區域性變數作用域是定義該變數的子程式。 全域性變數

Hadoop-mapreduce 程式在windows上執行需要注意的問題

1.在主程式中需要新增這幾個引數配置 Configuration conf = new Configuration(); // 1、設定job執行時要訪問的預設檔案系統 conf.set("fs.defaultFS", HADOOP_ROOT_PATH);

Python學習【第3篇】:Python運算子 python-----運算子及while迴圈

python-----運算子及while迴圈 一、運算子 計算機可以進行的運算有很多種,不只是加減乘除,它和我們人腦一樣,也可以做很多運算。 種類:算術運

JavaSE路02--編寫一個Java程式

1.開發第一個簡單的JAVA程式(必會) class Test { public static void main(String[] args){ System.out.println(“hello word”); } } 說明: 1.首先定義一個類 格式: c

Python路-python(mysql介紹和安裝、pymysql、ORM sqlachemy)

轉載至:北極之光部落格 本節內容 1、資料庫介紹 2、mysql管理 3、mysql資料型別 4、常用mysql命令   建立資料庫   外來鍵   增刪改查表 5、事務 6、索引 7、python 操作mysq