1. 程式人生 > >spark讀取日誌檔案,把RDD轉化成DataFrame

spark讀取日誌檔案,把RDD轉化成DataFrame

一、先開啟Hadoop和spark

二、啟動spark-shell

spark-shell --master local[2] --jars /usr/local/src/spark-1.6.1-bin-hadoop2.6/libext/com.mysql.jdbc.Driver.jar

1.讀取spark目錄下面的logs日誌作為測試:

val alllog=sc.textFile("file:///usr/local/src/spark-1.6.1-bin-hadoop2.6/logs/*out*")

alllog.count 看看一共有347記錄

2.轉為為DataFrame

現在讀取進來的是RDD格式,用map函式把每條記錄轉成一行

import org.apache.spark.sql.Row

val alllogRDD=alllog.map(x =>Row(x))

import org.apache.spark.sql.types._

val schemaString="line"

val schema=StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType,true)))

val alllogDataFrame = sqlContext.createDataFrame(alllogRDD, schema)

alllogDataFrame.printSchema  #列印schema
alllogDataFrame.show(false) #這裡的false表示不省略,否則跟下午一樣,會三點省略

到此為止,已經把RDD轉化為DataFrame了。

三、把DataFrame轉為為表用SQL查詢

alllogDataFrame.registerTempTable("log")

sqlContext.sql("SELECT * FROM log").show()

到此就可以使用SQL了。