1. 程式人生 > >pySpark讀寫CSV檔案、查重並寫入CSV檔案中

pySpark讀寫CSV檔案、查重並寫入CSV檔案中

前段時間在研究 pySpark 資料處理,深刻地感受到spark的極大魅力。自己是一個初學者,這篇部落格也只是簡單的完成了幾個簡單的工作任務,哈哈@@

不說了上程式碼:

from pyspark import SparkConf,SparkContext import csv from _operator import add import re #!/usr/bin/python # -*- coding: UTF-8 -*- conf = SparkConf().setAppName("Simple").setMaster("local") sc = SparkContext(conf = conf) file1 = open("D:\\lbossdata.CSV","r") f1 = csv.reader(file1)  //開啟csv檔案將檔案的第一行的第一列讀入到row1 row1 = [row[0] for row in f1] file1.close() with open("D:\\lbossdata.CSV","r") as file2: f2 = csv.reader(file2) row2 = [row[1] for row in f2] with open("D:\\lbossdata.CSV","r") as file3: f3 = csv.reader(file3) row3 = [row[2] for row in f3]

通第一個方法一樣,將其餘幾行也分別讀入到row2\row3,row4,row5,row6。

最先應將讀出來的每一個row放入資料集中,將每一個RDD聯結為一個RDD。

然後開始去重,呼叫distinct()方法,程式碼如下:

RDD1 = sc.parallelize(row1, 1)
RDD2 = sc.parallelize(row2, 1)
RDD3 = sc.parallelize(row3, 1)
RDD4 = sc.parallelize(row4, 1)
RDD5 = sc.parallelize(row5, 1)
RDD6 = sc.parallelize(row6, 1)
RDD = RDD1.zip(RDD2).zip(RDD3).zip(RDD4).zip(RDD5).zip(RDD6)
print(RDD.count())
RDD_1= RDD.distinct()
print(RDD_1.count())
RDD1_1 = RDD_1.map(lambda x:(x[0][0][0][0][0],x[0][0][0][0][1],x[0][0][0][1],x[0][0][1],x[0][1],x[1]))

最後一行是將每一個RDD中的資料使用逗號分隔開,以便儲存到csv檔案中去。

最後在本地建立一個csv檔案,將上邊的RDD1_1寫入csv檔案即可。

with open("D:\\z.CSV","w",newline="") as file_1:
     f_csv = csv.writer(file_1)
     f_csv.writerows(RDD1_1.collect())

這就是所有的程式碼,速度相當快。大概用時不到1秒。@@

第一次寫部落格,感覺還行,不知道程式碼怎麼貼入,總是亂貼,還請大家包涵。