1. 程式人生 > >SparkSQL(5)——Spark SQL程式設計方式執行查詢

SparkSQL(5)——Spark SQL程式設計方式執行查詢

編寫Spark SQL程式實現RDD轉換成DataFrame

Spark官網提供了兩種方法來實現從RDD轉換得到DataFrame,第一種方法是利用反射機制,推導包含某種型別的RDD,通過反射將其轉換為指定型別的DataFrame,適用於提前知道RDD的schema。第二種方法通過程式設計介面與RDD進行互動獲取schema,並動態建立DataFrame,在執行時決定列及其型別。
一、新增maven依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

通過反射推斷Schema

Scala支援使用case class型別匯入RDD轉換為DataFrame,通過case class建立schema,case class的引數名稱會被利用反射機制作為列名。這種RDD可以高效的轉換為DataFrame並註冊為表。
package com.fgm.sparksql

import org.apache.spark.sql.SparkSession

//利用反射,將rdd轉換成dataFrame
case class Person(val id:Int,val name:String,val age:Int)


object SchemaDemo {
  def main(args: Array[String]): Unit = {

    //建立SparkSession物件
    val sparkSession = SparkSession.builder().appName("Schema").master("local[2]").getOrCreate()
    //建立SparkContext物件
    val sc = sparkSession.sparkContext
    sc.setLogLevel("WARN")
    //讀取資料檔案
    val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" "))
    //將rdd與樣例類關聯
    val personRDD = rdd1.map(x=>Person(x(0).toInt,x(1),x(2).toInt))
    //將personRDD轉換成DataFrame,需匯入隱式轉換
    import sparkSession.implicits._
    val personDF = personRDD.toDF()
    //dataFrame操作
    //DSL風格
    personDF.printSchema()
    personDF.show()
    personDF.select("name","age").show()
    personDF.select($"age">30).show()

    //sql風格語法
    personDF.createTempView("person")
    sparkSession.sql("select * from person").show()
    sparkSession.sql("select * from person where age>30").show()
    sparkSession.sql("select * from person where id=3").show()


    sparkSession.stop()
  }
}

通過StructType直接指定Schema

當case class不能提前定義好時,可以通過以下三步建立DataFrame
(1)將RDD轉為包含Row物件的RDD
(2)基於StructType型別建立schema,與第一步建立的RDD相匹配
(3)通過sparkSession的createDataFrame方法對第一步的RDD應用schema建立DataFrame

package com.fgm.sparksql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  *通過StructType指定schema,將rdd轉換成dataFrame
  * @Auther: fgm
  */
object StructTypeSchema {
  def main(args: Array[String]): Unit = {
    //建立SparkSession物件
    val spark = SparkSession.builder().appName("StructTypSchema").master("local[2]").getOrCreate()
    //建立SparkContext
    val sc = spark.sparkContext
    sc.setLogLevel("WARN")
    //讀取資料
    val rdd1 = sc.textFile("D:\\tmp\\person.txt").map(_.split(" "))
    //將rdd與rowd物件關聯
    val rowRDD = rdd1.map(x=>Row(x(0).toInt,x(1),x(2).toInt))

    //指定schema
    val schema=(new StructType).add(StructField("id",IntegerType,true))
      .add(StructField("name",StringType,false))
      .add(StructField("age",IntegerType,true))

    val dataFrame = spark.createDataFrame(rowRDD,schema)
    dataFrame.printSchema()
    dataFrame.show()

    dataFrame.createTempView("person")
    spark.sql("select * from person").show()

    spark.stop()
  }
}

編寫程式操作HiveContext

HiveContext是對應spark-hive這個專案,與hive有部分耦合, 支援hql,是SqlContext的子類,在Spark2.0之後,HiveContext和SqlContext在SparkSession進行了統一,可以通過操作SparkSession來操作HiveContext和SqlContext。

新增依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>2.1.3</version>
</dependency>

實現

package com.fgm.sparksql

import org.apache.spark.sql.SparkSession

/**
  *SparkSql操作
  *
  * @Auther: fgm
  */
object HiveSparkSql {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("HiveSparkSql").master("local[2]").enableHiveSupport().getOrCreate()
    spark.sql("create table user(id int,name string,age int) row format delimited fields terminated by ','")
    spark.sql("load data local inpath './data/user.txt' into table user")
    spark.sql("select * from user").show()

    spark.stop()
  }

}

注意:這裡首先在專案根目錄下建立data目錄(和src同級),然後在data中穿件user.txt檔案,並寫入相關資料(1,zhangsan,22)。不然會報錯。另外需要開啟HiveSupport服務:enableHiveSupport()
在這裡插入圖片描述