1. 程式人生 > >spark sql 小樣

spark sql 小樣

parquet truct 讀取文件 cit per ive java sources lin

package dev.spark.sql

import java.util.Properties

import org.apache.spark.sql.{Row, SQLContext, SaveMode}
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}

object DataFrame {

val num = 0
val map = scala.collection.immutable.Map("url" -> "jdbc:mysql://192.168.0.1:3306/spark",
"dbtable"-> "tmp_table3",
"user"-> "spark",
"password"->"spark")
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
conf.setMaster("local")
conf.setAppName("dataFrame")
val sc = new SparkContext(conf)
val ssc = new SQLContext(sc)
val df = ssc.read.json()
ssc.read.format("json").load(".json")
// dataFrame.show 直接查看數據集 按條件查看數據集
df.show()
df.filter(df.col("col")<= num).show()

// 將dataFrame註冊為臨時表 按照SQL方式訪問數據集
df.registerTempTable("tmp_table0")
// 返回的結果是將每行包裝為ROW的數據集集
val dataSet0 = ssc.sql("SELECT col FROM tmp_table WHERE col <="+ num)
// dataSet屬性方法很多
dataSet0.collect()foreach(println)
dataSet0.columns.foreach(println)
dataSet0.rdd.foreach(println)
dataSet0.explain()
dataSet0.alias("")
dataSet0.cache()
dataSet0.na

// SQLContext格式化讀取文件
// parquet
val pssc = new SQLContext(sc)
pssc.read.format("parquet")load(".parquet")
// jdbc
val dataSet3 = ssc.read.format("jdbc").options(map).load()

dataSet3.write.jdbc("jdbc:mysql://192.168.0.1:3306/spark","tmp_table3",new Properties())
// HiveSQLContext在resources中配置hive-site.xml後對hive倉庫進行查詢 註意:優先從臨時表中查詢,可以通過數據庫.表名的方式完全限定避免歧義,默認倉庫是default
val hssc = new HiveContext(sc)

val dataSet1 = hssc.sql("SELECT col FROM database.table")
dataSet1.registerTempTable("tmp_table1")
// 相同sparkContext上下文可以進行聯表操作
hssc.sql("SELECT * FROM tmp_table0 t0 inner join tmp_table1 t1 on t0.col = t1.col")

// 數據映射為表
dataSet0.rdd.map(line=>Row(line.size))
val rowkeyStructField = new StructField("rowkey", IntegerType,true)
val tableStructType = new StructType(Array(rowkeyStructField))
val dataSet2 = hssc.createDataFrame(dataSet0.rdd, tableStructType)
dataSet2.registerTempTable("tmp_table2")
dataSet2.write.mode(SaveMode.Append).saveAsTable("hive_spark.tmp_table2")

// rdd轉dataframe需要隱式轉換
import ssc.implicits._
case class RowKeyClass (rowkey:Int)
dataSet0.rdd.map(x => new RowKeyClass(x.size)).toDF()
}
}

spark sql 小樣