1. 程式人生 > >Spark SQL的介紹和DataFrame的建立及使用

Spark SQL的介紹和DataFrame的建立及使用

 

1.

Spark SQL定位處理結構化資料的模組。SparkSQL提供相應的優化機制,並支援不同語言的開發API。
java、scala、Python,類SQL的方法呼叫(DSL)
2.

RDD與Spark SQL的比較說明:
  使用Spark SQL的優勢:a.面向結構化資料;b.優化機制;
  RDD缺點:a.沒有優化機制,如對RDD執行Filter操作;
     b.RDD型別轉換後無法進行模式推斷
3.

DataFrame/SchemaRDD
  DataFrame是一個分散式的資料集合,該資料集合以命名列的方式進行整合。
  Dateframe=RDD(資料集)+Schema(元資料/模型)


  SchemaRDD就是DataFrame的前身,在1.3.0版本後。
  DataFrame存放的是ROW物件。每個Row 物件代表一行記錄。      

  SchemaRDD還包含記錄的結構資訊(即資料欄位)
4.

建立Spark SQL環境
  a.將SparkSQL依賴庫新增至pom.xml檔案中
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-sql_2.11</artifactId>
    <version>2.1.2</version>


    </dependency>
  b.建立SparkSQL Context-->SparkSession
    通過SparkSession.builder()建立構造器;
    並呼叫.appName("sparkSQL").master("local")設定叢集模式以及app名稱
    最後必須呼叫getOrCreate()方法建立SparkSession物件。
    val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
  c.載入外部資料來源:

    通過SparkSession的read()方法載入不同的資料來源:json、CVS、jdbc、textfile、parquert等
    val df = spark.read.textFile("file:///d:/測試資料/users.txt").toDF()
    df.show()

DF的建立方式

 

  

  (1)通過SparkSession的createDataFrame(...)方法建立DF物件
    a.將Seq序列轉換為DF
    b.將RDD[Product]多元素轉換為DF
  (2)通過SparkSession的read讀取外部檔案呼叫toDF()
  (3)通過匯入隱式轉換,可直接將Scala中的序列轉換為DF
    val spark = SparkSession.builder().appName("sparkSQL").master("local").getOrCreate()
    import spark.implicits._
    val list = List(("zhangsan",12,"changchun"),("lilei",25,"haerbin"))
    val df_implicits = list.toDF()

 檢視DF的Schema

 

 

1.案例說明:
  val rdd = sc.textFile("file:///d:/測試資料/users.txt").map(x=>x.split(" ")).map(x=>(x(0),x(1),x(2)))
  val df_rdd = spark.createDataFrame(rdd)
  df_rdd.show()
  df_rdd.select("_1","_2").where("_1 like '%o%'").show()
  df_rdd.printSchema()
    root
      |-- _1: string (nullable = true)
      |-- _2: string (nullable = true)
      |-- _3: string (nullable = true)
  通過case用例類可以對DF進行Schema匹配
  case class Person(name:String,age:Int,address:String)

  val rdd = sc.textFile("file:///d:/測試資料/users.txt").map(x=>x.split(" ")).map(x=>new Person(x(0),x(1).toInt,x(2)))
  val df_rdd = spark.createDataFrame(rdd)
  df_rdd.printSchema()
    root
      |-- name: string (nullable = true)
      |-- age: integer (nullable = true)
      |-- address: string (nullable = true)
  df_rdd.show()
    +------+---+-------+
    | name|age|address|
    +------+---+-------+
    | anne| 22| NY|
    | joe| 39| CO|
    |alison| 35| NY|
    +------+---+-------+
2.實現簡單的select操作
  df_rdd.select("name","age").where("name like '%o%'").show()
    +------+---+
    | name|age|
    +------+---+
    | joe| 39|
    |alison| 35|
    | bob| 71|
    +------+---+

 DF的操作方式

 

 

1.顯示:
  df_rdd.show()
2.查詢
  df_rdd.select("name").show()
3.條件查詢:
  df_rdd.select($"name",$"age").where("name like '%o%'").show() //注:引入spark.implicits._
    +------+---+
    | name|age|
    +------+---+
    | joe| 39|
    |alison| 35|
    | bob| 71|
    +------+---+
4.條件查詢:
  df_rdd.select($"name",$"age"+1).where("name like '%o%'").show() //$是scala的用法,需要隱式轉換 import spark.implicits._
    +------+---------+
    | name|(age + 1)|
    +------+---------+
    | joe| 40|
    |alison| 36|
    | bob| 72|
    +------+---------+
5.過濾操作
  a.通過過濾表示式:
    df_rdd.filter("age > 36").show()
  b.通過func式程式設計進行處理,DF中每個元素均為ROW
    df_rdd.filter(x=>{if(x.getAs[Int]("age") > 36) true else false }).show()
6.分組操作
    df_rdd.groupBy("address").count().show
      +-------+-----+
      |address|count|
      +-------+-----+
      | OR| 2|
      | VA| 2|
      | CA| 2|
      | NY| 3|
      | CO| 1|
      +-------+-----+