以hdfs上檔案或者本地檔案作為輸入在Pycharm中執行spark程式程式碼示例
阿新 • • 發佈:2019-01-05
以hdfs上檔案或者本地檔案作為輸入在Pycharm中執行spark程式,並將計算結果儲存到hdfs上的python程式碼示例
(1)準備輸入資料檔案:
準備輸入資料檔案2017-11-01.txt和2017-11-10.txt,每行每個欄位以"\t"分割,行之間用換行符分割,並上傳到hdfs上/input目錄下。內容如下:
localhost:experiment_data a6$ more 2017-11-01.txt AA 10001 2017-10-06 BB 10002 2017-10-07 CC 10003 2017-10-08 AA 10004 2017-10-09 DD 10003 2017-10-20 localhost:experiment_data a6$ more 2017-11-10.txt AA 10009 2017-11-06 BB 10002 2017-11-07 CC 10004 2017-11-08 AA 10006 2017-11-09 DD 10003 2017-11-20
(2)python spark程式碼如下
#!/usr/bin/env python # -*- coding: utf-8 -*- # # File Name: filter_and_retain_lastest_hbsid_state.py # # Function: # # # Author: yz # # Create Time: 2016-11-07 16:31:54 # ###################################################### import sys reload(sys) sys.setdefaultencoding("utf-8") from pyspark import SparkContext, SparkConf def get_data(line): # line = '\t'.join([str(hbsid), str(user_id),str(Global_date)]) line = line.strip() ary= line.split('\t') if len(ary) != 3: return line hbsid,user_id,curr_date=ary #print (hbsid, (user_id,curr_date)) return (hbsid, (user_id,curr_date)) def form_max_hbsid_userid(data_in): key, values = data_in #print key,values #return '\t'.join([key, values[0],values[1]]) return '\t'.join([key, str(values[0])+";"+str(values[1])]) def main(): conf = SparkConf().setAppName("merge_hbsid_userid") sc = SparkContext(conf=conf) '''# hdfs目錄''' #input_data_path = "hdfs://localhost:9002/input/2017-11-01*" '''# 本地目錄''' input_data_path="file:///Users/a6/Downloads/PycharmProjects/speiyou_di_my/userid_hbsid_map_1107_final/input_local/2017-11-01*" #print input_data_path result = sc.textFile(input_data_path) result = result.map(get_data) #print result #print "result.collect()", result.collect() result = result.filter(lambda x: isinstance(x, tuple) is True) result = result.map(form_max_hbsid_userid) result.saveAsTextFile("hdfs://localhost:9002/output/2017-11.res") main() ''' hadoop fs -cat hdfs://localhost:9002/output/2017-11-01.res/* hadoop fs -rmr hdfs://localhost:9002/output/2017-11-01.res '''
(3)配置執行引數
或者def main()函式裡邊新增spark的安裝目錄,即可。
os.environ["SPARK_HOME"] = "/Users/a6/Applications/spark-2.1.0-bin-hadoop2.6"
具體例子如下:
# -*- coding:utf-8 -*- from pyspark import SparkConf from pyspark import SparkContext import os if __name__ == '__main__': os.environ["SPARK_HOME"] = "/Users/a6/Applications/spark-2.1.0-bin-hadoop2.6" conf = SparkConf().setMaster('local').setAppName('group') sc = SparkContext(conf=conf) data = [('tom',90),('jerry',97),('luck',92),('tom',78),('luck',64),('jerry',50)] rdd = sc.parallelize(data) print rdd.groupByKey().map(lambda x: (x[0],list(x[1]))).collect() # 輸出: [('tom', [90, 78]), ('jerry', [97, 50]), ('luck', [92, 64])]
(4)執行程式,輸出提示,及執行結果如下:
1)輸出提示如下:
/System/Library/Frameworks/Python.framework/Versions/2.7/bin/python2.7 /Users/a6/Downloads/PycharmProjects/userid_hbsid_map_final/filter_and_retain_lastest_hbsid_state.py
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
17/11/07 19:50:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/11/07 19:50:48 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 10.2.32.209 instead (on interface en0)
17/11/07 19:50:48 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Process finished with exit code 0
2)執行結果如下:
localhost:input_local a6$ hadoop fs -cat hdfs://localhost:9002/output/2017-11.res/*
17/11/07 19:57:49 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
AA 10001;2017-10-06
BB 10002;2017-10-07
CC 10003;2017-10-08
AA 10004;2017-10-09
DD 10003;2017-10-20
AA 10009;2017-11-06
BB 10002;2017-11-07
CC 10004;2017-11-08
AA 10006;2017-11-09
DD 10003;2017-11-20
localhost:input_local a6$
注意哈,兩者目錄的書寫格式:'''# hdfs目錄'''
input_data_path = "hdfs://localhost:9002/input/2017-11-01*"
'''# 本地目錄'''
input_data_path="file:///Users/a6/Downloads/PycharmProjects/speiyou_di_my/userid_hbsid_map_1107_final/input_local/2017-11-01*"