1. 程式人生 > >《深入理解Spark》之RDD和DataFrame的相互轉換

《深入理解Spark》之RDD和DataFrame的相互轉換

package com.lyzx.day18

import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType,IntegerType};

/**
 * Spark SQL
 * RDD和DataFrame的相互轉換
 */
class T4 {

  /*
    通過反射的方式把RDD[User]轉換為DataFrame
   */
  def f1(sc:SparkContext): Unit ={
    val sqlCtx = new SQLContext(sc)
//  讀取檔案並轉換為RDD[User]
    val rdd = sc.textFile("User.txt")
    val userRdd = rdd.map(item=>item.split(","))
                     .map(item=>User(item(0).toInt,item(1),item(2).toInt,item(3).toInt))

//  引入隱式轉換的函式
    import sqlCtx.implicits._

//  把RDD[User]轉換為DataFrame,這裡資料的列名不能指定,因為使用方法了反射,所以列名就是User的屬性名
    val df = userRdd.toDF()
//  把DataFrame註冊為一個臨時表 即把df裡面的資料"放入"一張臨時表裡面並起一個名字
    df.registerTempTable("user")

//    通過SQLContext的例項寫SQL並返回包含結果集的DataFrame的物件
    val result = sqlCtx.sql("select id,name,age,height from user where id >=2")

//    遍歷結果集
    result.foreach(println)
  }

  /*
    動態得把RDD轉換為DataFrame
    可以動態的指定Schema(這是Spark裡面的稱呼,其實就是列名+型別+是否為空,不知道spark為什麼把這些東西叫Schema)
   */
  def f2(sc:SparkContext): Unit ={
      val sqlCtx = new SQLContext(sc)

      val rdd = sc.textFile("./User.txt")
      val mapRdd = rdd.map(item=>item.split(","))
                      .map(item=>Row(item(0),item(1),item(2),item(3)))


    def getSchema2(columnName:String): StructType ={
      StructType(columnName.split(",").map(item=>StructField(item,StringType,true)))
    }

    //這就是schema即列名+型別+是否為空
    val schema = getSchema2("id_x,name_y,age_z,height_m")

    //通過sqlContext的例項建立DataFrame
    val df = sqlCtx.createDataFrame(mapRdd,schema)
    df.registerTempTable("user")

    val result = sqlCtx.sql("select id_x,name_y,age_z,height_m from user where id_x >=3")
    result.foreach(println)
  }


  def f3(sc:SparkContext): Unit ={
    val sqlCtx = new SQLContext(sc)

    val userRdd = sc.textFile("./User.txt")
                    .map(x=>x.split(","))
                    .map(x=>Row(x(0),x(1),x(2),x(3)))

    def getSchema2(columnName:String): StructType ={
      StructType(columnName.split(",").map(item=>StructField(item,StringType,true)))
    }

    val userSchema = getSchema2("id,name,age,height")
    val userDf = sqlCtx.createDataFrame(userRdd,userSchema)
        userDf.registerTempTable("user")

    val goodsRdd = sc.textFile("./goods.txt")
                     .map(x=>x.split(","))
                     .map(x=>Row(x(0),x(1),x(2),x(3)))

    val goodsSchema = getSchema2("userId,goodsName,goodsPrice,goodsCount")

    val goodsDf = sqlCtx.createDataFrame(goodsRdd,goodsSchema)
        goodsDf.registerTempTable("goods")

    val result = sqlCtx.sql("select a.id as userId,a.name as userName,b.goodsName,b.goodsPrice from user a left join goods b on a.id=b.userId")
    result.foreach(println)
  }

  /*
   json資料來源
   sqlContext可以直接讀取json格式的文字檔案
  */
  def f4(sc:SparkContext): Unit ={
    val sqlCtx = new SQLContext(sc)
    val jsonRdd = sqlCtx.read.json("./json.txt")
    jsonRdd.printSchema()

    jsonRdd.registerTempTable("person")

    val df = sqlCtx.sql("select * from person where age > 10")
    df.foreach(println)
  }
}

object T4{
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("day18").setMaster("local")
    val sc = new SparkContext(conf)

    val  t = new T4
//    t.f1(sc)
//    t.f2(sc)
//    t.f3(sc)
    t.f4(sc)

    sc.stop()
  }
}

case class User(id:Int,name:String,age:Int,height:Int){
  private val _id = id;
  private val _name = name;
  private val _age = age;
  private val _height = height;

  override def toString(): String ={
    "[id="+_id+" name="+_name+" age="+_age+" height="+_height+"]"
  }
}