1. 程式人生 > >Spark SQL將rdd轉換為資料集-以程式設計方式指定模式(Programmatically Specifying the Schema)

Spark SQL將rdd轉換為資料集-以程式設計方式指定模式(Programmatically Specifying the Schema)

一:解釋

官網:https://spark.apache.org/docs/latest/sql-getting-started.html
這種場景是生活中的常態
When case classes cannot be defined ahead of time (for example, the structure of records is encoded in a string, or a text dataset will be parsed and fields will be projected differently for different users), a DataFrame can be created programmatically with three steps.

Create an RDD of Rows from the original RDD;
Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.
Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.

二:操作

package g5.learning

import org.apache.spark.sql.types.{LongType, StructField, StructType,StringType}
import org.apache.spark.sql.{Row, SparkSession}

object DFApp {
  def main(args: Array[String]): Unit = {
    val  sparksession= SparkSession.builder().appName("DFApp")
      .master("local[2]")
      .getOrCreate()

     //inferReflection(sparksession)

    programmatically(sparksession)
    sparksession.stop()
  }



  //第二種方法是程式設計的形式
  def programmatically(sparkSession: SparkSession)={
    val info = sparkSession.sparkContext.textFile("file:///E:\\data.txt")
    //Create an RDD of Rows from the original RDD;
    import sparkSession.implicits._
    val rdd =info.map(_.split("\t")).map(x=>Row(x(0),x(1),x(2).toLong))
    //這個rdd是個row型別
    //Create the schema
    val struct =StructType(Array(
      StructField("ip", StringType,true),
      StructField("time", StringType,false) ,
        StructField("responseSize",LongType,false)
    ))
    //Apply the schema to the RDD of Rows via createDataFrame method provided by SparkSession.
   val df = sparkSession.createDataFrame(rdd,struct)
    df.show()
        }
  }

三:注意

1.這種型別要比官網更好,適合大部分型別
2.要注意運算元的轉換(開始的時候,找運算元也是遇到了很多麻煩,運算元很多,要注意挑選自己的需求)

四:結果

在這裡插入圖片描述