1. 程式人生 > >以hdfs上檔案或者本地檔案作為輸入在Pycharm中執行spark程式程式碼示例

以hdfs上檔案或者本地檔案作為輸入在Pycharm中執行spark程式程式碼示例

以hdfs上檔案或者本地檔案作為輸入在Pycharm中執行spark程式,並將計算結果儲存到hdfs上的python程式碼示例

(1)準備輸入資料檔案:

準備輸入資料檔案2017-11-01.txt和2017-11-10.txt,每行每個欄位以"\t"分割,行之間用換行符分割,並上傳到hdfs上/input目錄下。內容如下:

localhost:experiment_data a6$ more 2017-11-01.txt
AA      10001   2017-10-06
BB      10002   2017-10-07
CC      10003   2017-10-08
AA      10004   2017-10-09
DD      10003   2017-10-20
localhost:experiment_data a6$ more 2017-11-10.txt
AA      10009   2017-11-06
BB      10002   2017-11-07
CC      10004   2017-11-08
AA      10006   2017-11-09
DD      10003   2017-11-20

(2)python spark程式碼如下

#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# File Name:    filter_and_retain_lastest_hbsid_state.py
#
# Function:
#
#
# Author: yz
#
# Create Time:    2016-11-07 16:31:54
#
######################################################
import sys
reload(sys)
sys.setdefaultencoding("utf-8")
from pyspark import SparkContext, SparkConf

def get_data(line):
    # line =  '\t'.join([str(hbsid), str(user_id),str(Global_date)])
    line = line.strip()
    ary= line.split('\t')
    if len(ary) != 3:
        return line
    hbsid,user_id,curr_date=ary
    #print (hbsid, (user_id,curr_date))
    return (hbsid, (user_id,curr_date))

def form_max_hbsid_userid(data_in):
    key, values = data_in
    #print key,values
    #return '\t'.join([key, values[0],values[1]])
    return '\t'.join([key, str(values[0])+";"+str(values[1])])

def main():
    conf = SparkConf().setAppName("merge_hbsid_userid")
    sc = SparkContext(conf=conf)
    '''# hdfs目錄'''
    #input_data_path = "hdfs://localhost:9002/input/2017-11-01*"
    '''# 本地目錄'''
    input_data_path="file:///Users/a6/Downloads/PycharmProjects/speiyou_di_my/userid_hbsid_map_1107_final/input_local/2017-11-01*"
    #print input_data_path
    result = sc.textFile(input_data_path)
    result = result.map(get_data)
    #print result
    #print "result.collect()", result.collect()
    result = result.filter(lambda x: isinstance(x, tuple) is True)
    result = result.map(form_max_hbsid_userid)
    result.saveAsTextFile("hdfs://localhost:9002/output/2017-11.res")
main()

'''
hadoop fs  -cat hdfs://localhost:9002/output/2017-11-01.res/*
hadoop fs  -rmr hdfs://localhost:9002/output/2017-11-01.res
'''

(3)配置執行引數


或者def main()函式裡邊新增spark的安裝目錄,即可。

    os.environ["SPARK_HOME"] = "/Users/a6/Applications/spark-2.1.0-bin-hadoop2.6"

具體例子如下:

# -*- coding:utf-8 -*-
from pyspark import SparkConf
from pyspark import SparkContext
import os
if __name__ == '__main__':
    os.environ["SPARK_HOME"] = "/Users/a6/Applications/spark-2.1.0-bin-hadoop2.6"
    conf = SparkConf().setMaster('local').setAppName('group')
    sc = SparkContext(conf=conf)
    data = [('tom',90),('jerry',97),('luck',92),('tom',78),('luck',64),('jerry',50)]
    rdd = sc.parallelize(data)
    print rdd.groupByKey().map(lambda x: (x[0],list(x[1]))).collect()

# 輸出:
[('tom', [90, 78]), ('jerry', [97, 50]), ('luck', [92, 64])]

(4)執行程式,輸出提示,及執行結果如下:

1)輸出提示如下:

/System/Library/Frameworks/Python.framework/Versions/2.7/bin/python2.7 /Users/a6/Downloads/PycharmProjects/userid_hbsid_map_final/filter_and_retain_lastest_hbsid_state.py
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/07 19:50:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/07 19:50:48 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 10.2.32.209 instead (on interface en0)
17/11/07 19:50:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
                                                                                
Process finished with exit code 0

2)執行結果如下:

localhost:input_local a6$ hadoop fs  -cat hdfs://localhost:9002/output/2017-11.res/*
17/11/07 19:57:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
AA	10001;2017-10-06
BB	10002;2017-10-07
CC	10003;2017-10-08
AA	10004;2017-10-09
DD	10003;2017-10-20
AA	10009;2017-11-06
BB	10002;2017-11-07
CC	10004;2017-11-08
AA	10006;2017-11-09
DD	10003;2017-11-20
localhost:input_local a6$
注意哈,兩者目錄的書寫格式:

'''# hdfs目錄'''
    input_data_path = "hdfs://localhost:9002/input/2017-11-01*"
'''# 本地目錄'''
    input_data_path="file:///Users/a6/Downloads/PycharmProjects/speiyou_di_my/userid_hbsid_map_1107_final/input_local/2017-11-01*"