1. 程式人生 > >最新版Spark2.2讀取多種檔案格式資料

最新版Spark2.2讀取多種檔案格式資料

Spark2.0+的檔案讀取

Spark可以讀取多種格式檔案,csv,json,parque。因此對應就有很多函式與之對應。在Spark2.0以後一般使用SparkSession來操作DataFrame、Dataset來完成資料分析。這些讀取不同格式檔案的函式就是SparkSession的成員DataFrameReader的方法。該類就是將檔案系統(HDFS,LocalFileSystem(一定要在每臺機器上都有的檔案,不然會找不到檔案,因為不確定executor會在哪臺機器上執行,如果是本地檔案,執行executor機器上一定要有該檔案))中的檔案讀取到Spark中,生成DataFrame的類。下面來看看具體的檔案讀取。

1.CSV

其實該方法叫CSV不是很好,因為它不止可以讀CSV檔案,他可以讀取一類由分隔符分割資料的檔案,由於這類檔案中CSV是代表,所以該函式才叫CSV吧。
1.1標準CSV
csv資料
特徵:有空值?表示,有表頭,型別明確

"id_1","id_2","cmp_fname_c1","cmp_fname_c2","cmp_lname_c1","cmp_lname_c2","cmp_sex","cmp_bd","cmp_bm","cmp_by","cmp_plz","is_match"
41264,44629,1,?,1,?,1,1,1,1,1,TRUE
28871,41775
,1,?,1,?,1,1,1,1,1,TRUE 99344,99345,1,?,1,?,1,1,1,1,1,TRUE 31193,66985,1,?,1,?,1,1,1,1,0,TRUE 24429,25831,1,?,1,?,1,1,1,1,1,TRUE 23571,49029,1,?,1,?,1,1,1,1,1,TRUE 6884,6885,1,?,1,?,1,1,1,1,1,TRUE 7144,9338,1,?,1,?,1,1,1,1,1,TRUE

spark程式碼:

val spark = SparkSession.builder().appName("fileRead").getOrCreate()
        import
spark.implicits._ val data1 = spark.read // 推斷資料型別 .option("inferSchema", true) // 設定空值 .option("nullValue", "?") // 表示有表頭,若沒有則為false .option("header", true) // 檔案路徑 .csv("ds/block_10.csv") // 快取 .cache() // 列印資料格式 data1.printSchema() // 顯示資料,false引數為不要把資料截斷 data1.show(false)

效果:
這裡寫圖片描述
1.2TSV
TSV資料
特徵:無頭,有資料型別,\t分割

196 242 3   881250949
186 302 3   891717742
22  377 1   878887116
244 51  2   880606923
166 346 1   886397596
298 474 4   884182806
115 265 2   881171488
253 465 5   891628467
305 451 3   886324817
6   86  3   883603013

spark程式碼:

val cols = Seq("user_id", "item_id", "rating", "timestamp")
        val data2 = spark.read
            //          推斷資料型別
            .option("inferSchema", true)
            //          沒有表頭false
            .option("header", false)
            //          指定分隔符
            .option("delimiter", "\t")
            .csv("movie/u.data")
            //          設定頭
            .toDF(cols: _*)
            .cache()
        data2.printSchema()
        data2.show()
        //      計數
        data2.count()

結果:
這裡寫圖片描述

2.JSON檔案

JSON不像CSV那樣,他是半結構化的資料,因此他可以表示更加複雜的資料型別,但是缺點也同樣明顯,儲存同樣的資料,JSON檔案更大。
資料:有點複雜,介紹一下
軌跡ID long,使用者ID long,time timestamp,td string,trail [id int ,ts long,alt double,lon double ,alt double,d string]
主要就是大物件裡面有一個數組,數組裡面有很多小物件(數量不固定),csv是難以表示這種資料的。
這裡寫圖片描述
但是。。。處理起來很簡單

val jsonpath = "/home/wmx/hive/warehouse/trail/sample40.json"
val data3 = spark.read.json(jsonpath).cache()
data3.printSchema()
// 因為有點多隻顯示1條,不截斷
data3.show(1,false)

結果
這裡大家可以看到,時間戳資料被解析成string了,但是spark內建的資料型別是支援Date的
這裡寫圖片描述
因此要處理資料型別:
改為

//      按順序把型別全寫下來
        val schema: StructType = StructType(Seq(
            StructField("tid", IntegerType, true),
            StructField("uid", IntegerType, true),
            StructField("st", TimestampType, true),
            StructField("td", StringType, true),
            StructField("trail", ArrayType(StructType(Seq(
                StructField("id", IntegerType, true),
                StructField("ts", LongType, true),
                StructField("lat", DoubleType, true),
                StructField("alt", DoubleType, true),
                StructField("lon", DoubleType, true),
                StructField("d", StringType, true)))), true)));
        val data4 = spark.read
            .schema(schema)
            .json(jsonpath)
            .cache()
        data4.printSchema()
        data4.show(1, true)

結果:
型別完全匹配:
這裡寫圖片描述
最後準備寫一下parquet,但是parquet本人不是很熟,只知道parquet使用的函式是load(path:String),希望對大家有所幫助。
列式儲存

列式儲存和行式儲存相比有哪些優勢呢?

可以跳過不符合條件的資料,只讀取需要的資料,降低IO資料量。
壓縮編碼可以降低磁碟儲存空間。由於同一列的資料型別是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約儲存空間。
只讀取需要的列,支援向量運算,能夠獲取更好的掃描效能。