1. 程式人生 > >spark1.6使用:讀取本地外部資料,把RDD轉化成DataFrame,儲存為parquet格式,讀取csv格式

spark1.6使用:讀取本地外部資料,把RDD轉化成DataFrame,儲存為parquet格式,讀取csv格式

一、先開啟Hadoop和spark

二、啟動spark-shell

spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar

1.讀取spark目錄下面的logs日誌作為測試:

val alllog=sc.textFile("file:///usr/local/src/spark-1.6.1-bin-hadoop2.6/logs/*out*")

alllog.count 看看一共有347記錄

2.轉為為DataFrame

現在讀取進來的是RDD格式,用map函式把每條記錄轉成一行

import org.apache.spark.sql.Row

val alllogRDD=alllog.map(x =>Row(x))

import org.apache.spark.sql.types._

val schemaString="line"

val schema=StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType,true)))

val alllogDataFrame = sqlContext.createDataFrame(alllogRDD, schema)

alllogDataFrame.printSchema  #列印schema
alllogDataFrame.show(false) #這裡的false表示不省略,否則跟下午一樣,會三點省略

到此為止,已經把RDD轉化為DataFrame了。

三、把DataFrame轉為為表用SQL查詢

alllogDataFrame.registerTempTable("log")

sqlContext.sql("SELECT * FROM log").show()

到此就可以使用SQL了。

四、讀取和儲存外部資料來源

1.讀取json檔案

 val df = sqlContext.read.format("json").load("file:///mnt/hgfs/vm/china.json")

df.printSchema

 df.select("*").write.format("parquet").mode("overwrite").save("file:///mnt/hgfs/vm/china.parquet") #儲存為parquet格式

這裡的mode可以有overwrite,append,ignore等模式,也可以不用。

這樣就直接生產DataFrame資料,不用新增schema資訊了。

對於parquet檔案,還有更高階的使用方法,直接讀取檔案就行了

 val df = sqlContext.sql("SELECT * FROM parquet.`examples/src/main/resources/users.parquet`")

對於json裡面有巢狀陣列,想要展開成多行,可以在SQL中使用explode函