1. 程式人生 > >RDD轉換為DataFrame【反射/編程】

RDD轉換為DataFrame【反射/編程】

pac ESS cas == its 選擇 stop csdn auth

寫在前面
主要是加載文件為RDD,再把RDD轉換為DataFrame,進而使用DataFrame的API或Sql進行數據的方便操作

簡單理解:DataFrame=RDD+Schema

貼代碼

package february.sql

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

/**
  * Description:  ============Spark SQL支持兩種不同的方法將現有RDD轉換為Datasets數據集==============
  *
  *
  * (1) 反射 case class   前提:事先需要知道你的字段,字段類型
  * (2) 編程              事先不知道有哪幾列
  *   ****  優先選擇第一種 ****
  *
  * @Author: 留歌36
  * @Date: 2019/2/25 18:41
  */
object DataFrameRDDApp {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
                            .appName(this.getClass.getSimpleName)
                            .master("local[2]")
                            .getOrCreate()
    // 方法一:反射
//    inferReflection(spark)

    // 方法二:編程
    program(spark)

    spark.stop()

  }

  /**
    * 編程的方式
    * @param spark
    */
  private def program(spark: SparkSession) = {
    val textFile = spark.sparkContext.textFile("f:\\infos.txt")

    val infoRdd = textFile.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))

    val structType = StructType(Array(
      StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)))

    val DF =spark.createDataFrame(infoRdd, structType)
    DF.printSchema()

    DF.show()

  }


  /**
    * 反射的方式
    * @param spark
    */
  private def inferReflection(spark: SparkSession) = {
    // RDD ==> DataFrame  rdd.toDF()
    val textFile = spark.sparkContext.textFile("f:\\infos.txt")
    // split()返回 String[]
    // 註意:需要導入隱式轉換
    import spark.implicits._
    val infoDF = textFile.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

    // =====基於dataframe的API=======之後的就都是DataFrame 的操作了==============
    infoDF.show()

    infoDF.filter(infoDF.col("age") > 30).show()

    // ======基於SQL的API===========DataFrame 創建為一張表================
    infoDF.createOrReplaceTempView("infos")
    spark.sql("select * from infos where age > 30").show()
  }

  //類似java bean實體類
  // 反射的方式,將RDD的 每個字段 與 這裏的實體類 進行一一映射
  case class Info(id: Int, name: String, age: Int)


}

更多相關小demo:每天一個程序:https://blog.csdn.net/liuge36/column/info/34094

RDD轉換為DataFrame【反射/編程】