1. 程式人生 > >Python Hadoop Mapreduce 實現Hadoop Streaming分組和二次排序

Python Hadoop Mapreduce 實現Hadoop Streaming分組和二次排序

需求:公司給到一份全國各門店銷售資料,要求:1.按門店市場分類,將同一市場的門店放到一起;2.將各家門店按銷售額從大到小,再按利潤從大到小排列

一 需求一:按市場對門店進行分組

分組(partition)

Hadoop streaming框架預設情況下會以’/t’作為分隔符,將每行第一個’/t’之前的部分作為key,其餘內容作為value, 如果沒有’/t’分隔符,則整行作為key;這個key/value對又作為該map對應的reduce的輸入。

-D stream.map.output.field.separator 指定key,value分隔符,預設是/t ,例項如下:
-D stream.map.output.field.separator=,
將key,value以,作為分隔符

-D stream.num.map.output.key.fields 選擇key的範圍 
-D stream.num.map.output.key.fields=3
選擇前三個欄位作為key,即資料shuffle時候的key,由三個欄位組成

-D map.output.key.field.separator 指定key內部的分隔符 
-D map.output.key.field.separator=,
key直接是以,分割的

-D num.key.fields.for.partition 指定對key分出來的前幾部分做partition而不是整個key 
-D num.key.fields.for.partition=1 
選擇第一個key來進行分桶,如果想選擇任意key作為分桶,則如下
-D mapred.text.key.partitioner.options=-k1,2 
附註:-k1,2 指定對key進行劃分後第1 2個域進行劃分(沒有試過,可以嘗試一下)

1.2.資料

資料字典 store_code(店id),market_code(市場id),income(收入),pay(支出)

mcd_sale.csv
ch_sh_0003,ch_sh,500,200
ch_bj_0001,ch_bj,600,300
ch_bj_0002,ch_bj,1500,700
ch_wh_0001,ch_wh,500,200
ch_wh_0002,ch_wh,800,500
ch_sh_0001,ch_sh,1000,600
ch_sh_0002,ch_sh,800,600
ch_sh_0004,ch_sh,500,200
ch_bj_0003,ch_bj,500,200

mapreduce_partition.sh

-D map.output.key.field.separator=
, \ # key內部分隔符 -D num.key.fields.for.partition=1 \ # 用第一個key分桶 完整指令碼如下(親測可以執行) HADOOP_CMD="/root/apps/hadoop-2.6.4/bin/hadoop" STREAM_JAR_PATH="/root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar" INPUT_FILE_PATH="/data/data_coe/data_asset/bigdata/mapreduce/*.csv" OUTPUT_PATH="/data/data_coe/data_asset/bigdata/output2"
hdfs dfs -rmr $OUTPUT_PATH $HADOOP_CMD jar $STREAM_JAR_PATH \ -D stream.map.input.ignoreKey=true \ -D map.output.key.field.separator=, \ -D num.key.fields.for.partition=1 \ -numReduceTasks 3 \ -input $INPUT_FILE_PATH \ -output $OUTPUT_PATH \ -mapper "python map.py" \ -reducer "python reduce.py" \ -file ./map.py \ -file ./reduce.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner
# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/10/27 下午12:36
# @Author  : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: map_separator.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog    :https://blog.csdn.net/weixin_41734687

import sys
for line in sys.stdin:
    line = line.strip()
    seq = line.split(",")
    if len(seq) >=4:
        store_code = seq[0]
        market_code = seq[1]
        income = seq[2]
        pay = seq[3]
        try:
            netincome = int(income) - int(pay)
        except:
            netincome = 0
        print(market_code+ "," + store_code + "\t" + income + "," + pay + "," + str(netincome))
# coding=utf-8

import sys
for line in sys.stdin:
    line = line.strip()
    print line

檢視結果

hdfs dfs -ls /data/data_coe/data_asset/bigdata/output2/

-rw-r--r--   2 root supergroup          0 2018-10-28 05:30 /data/data_coe/data_asset/bigdata/output2/_SUCCESS
-rw-r--r--   2 root supergroup          0 2018-10-28 05:30 /data/data_coe/data_asset/bigdata/output2/part-00000
-rw-r--r--   2 root supergroup        205 2018-10-28 05:30 /data/data_coe/data_asset/bigdata/output2/part-00001
-rw-r--r--   2 root supergroup         58 2018-10-28 05:30 /data/data_coe/data_asset/bigdata/output2/part-00002


part-00000(無內容)

part-00001
ch_bj,ch_bj_0001	600,300,300
ch_bj,ch_bj_0002	1500,700,800
ch_bj,ch_bj_0003	500,200,300
ch_sh,ch_sh_0001	1000,600,400
ch_sh,ch_sh_0002	800,600,200
ch_sh,ch_sh_0003	500,200,300
ch_sh,ch_sh_0004	500,200,300

part-00002
ch_wh,ch_wh_0001	500,200,300
ch_wh,ch_wh_0002	800,500,300

二 需求二 按銷售額從大到小,在按利潤大到小

map.output.key.field.separator 設定key內的欄位分隔符 num.key.fields.for.partition 設定key內前幾個欄位用來做partition

事實上KeyFieldBasePartitioner還有一個高階引數 mapred.text.key.partitioner.options,這個引數可以認為是 num.key.fields.for.partition的升級版,它可以指定不僅限於key中的前幾個欄位用做partition,而是可以單獨指定 key中某個欄位或者某幾個欄位一起做partition。 比如上面的需求用mapred.text.key.partitioner.options表示為 mapred.text.key.partitioner.options=-k1,1 注意mapred.text.key.partitioner.options和num.key.fields.for.partition不需要一起使用,一起使用則以num.key.fields.for.partition為準。

二次排序(Secondary Sort)

mapper的輸出被partition到各個reducer之後,會有一步排序。預設是按照key做二次排序,如果key是多列組成,先按照第一列排序,第一列相同的,按照第二列排序 如果需要自定義排序。這裡要控制的就是key內部的哪些元素用來做排序依據,是排字典序還是數字序,倒序還是正序。用來控制的引數是mapred.text.key.comparator.options。

mapredue_partitionsort.sh

HADOOP_CMD="/root/apps/hadoop-2.6.4/bin/hadoop"
STREAM_JAR_PATH="/root/apps/hadoop-2.6.4/share/hadoop/tools/lib/hadoop-streaming-2.6.4.jar"
INPUT_FILE_PATH="/data/data_coe/data_asset/bigdata/mapreduce/*.csv"
OUTPUT_PATH="/data/data_coe/data_asset/bigdata/output2"
hdfs dfs -rmr  $OUTPUT_PATH 
$HADOOP_CMD jar $STREAM_JAR_PATH \
-D stream.map.input.ignoreKey=true \
-D stream.map.output.field.separator=, \
-D stream.num.map.output.key.fields=4 \
-D map.output.key.field.separator=, \
-D mapred.text.key.partitioner.options=-k1,1 \
-D mapred.output.key.comparator.class=org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
-D mapred.text.key.comparator.options="-k3,3nr -k4nr" \
-numReduceTasks 1 \
-input $INPUT_FILE_PATH \
-output $OUTPUT_PATH \
-mapper "python map_sort.py" \
-reducer "python reduce_sort.py" \
-file ./map_sort.py \
-file ./reduce_sort.py \
-partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner 

map_sort.py

# !/usr/bin/python
# -*- coding: utf-8 -*-
# @Time    : 2018/10/27 下午12:36
# @Author  : Einstein Yang!!
# @Nickname : 穿著開襠褲上大學
# @FileName: map_separator.py
# @Software: PyCharm
# @PythonVersion: python3.5
# @Blog    :https://blog.csdn.net/weixin_41734687

import sys
for line in sys.stdin:
    line = line.strip()
    seq = line.split(",")
    if len(seq) >=4:
        store_code = seq[0]
        market_code = seq[1]
        income = seq[2]
        pay = seq[3]
        try:
            netincome = int(income) - int(pay)
        except:
            netincome = 0
        print(market_code+ "," + store_code + "," + income + "," + str(netincome) + "," + pay )

reduce_sort.py

# coding=utf-8


import sys
for line in sys.stdin:
    line = line.strip()
    print line

結果檢視

ch_bj,ch_bj_0002,1500,800	700
ch_sh,ch_sh_0001,1000,400	600
ch_wh,ch_wh_0002,800,300	500
ch_sh,ch_sh_0002,800,200	600
ch_bj,ch_bj_0001,600,300	300
ch_wh,ch_wh_0001,500,300	200
ch_sh,ch_sh_0003,500,300	200
ch_bj,ch_bj_0003,500,300	200
ch_sh,ch_sh_0004,500,300	200