1. 程式人生 > >SparkSQL學習記錄(SparkSQL 兩種Schema建立方式)

SparkSQL學習記錄(SparkSQL 兩種Schema建立方式)

方式://l通過定義Case Class,使用反射推斷Schema(case class方式)

     //2 通過可程式設計介面,定義Schema,並應用到RDD上(createDataFrame 方式)

依賴:

                <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-core_2.10</artifactId>
			<version>1.6.1</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>1.6.1</version>
		</dependency>


方式一:
import org.apache.spark.SparkConf

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

//l通過定義Case Class,使用反射推斷Schema(case class方式)
 case class Person(name: String, age: Int)
object SparkSqlDemo1 {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("sparksqldemo1").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val rddpeople = sc.textFile("test.txt").map(_.split(" ")).map(p => Person(p(0), p(1).trim().toInt))
    //隱式轉換    
    // this is used to implicitly convert an RDD to a DataFrame.
    import sqlContext.implicits._
    val df = rddpeople.toDF()
    df.registerTempTable("people")
    
    //快取和清除快取表
    //sqlContext.cacheTable("people")
    //sqlContext.uncacheTable("people")
    //sqlContext.sql("CACHE TABLE people")
    //sqlContext.sql("UNCACHE TABLE people")
    val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")

    //DSL(Domain Specific Language)
    //在DSL中,使用Scala符號'+標示符表示基礎表中的列,Spark的execution engine會將這些標示符隱式轉換成表示式
    //另外可以在API中找到很多DSL相關的方法,如where()、select()、limit()等等,詳細資料可以檢視Catalyst模組中的DSL子模組
   // val teenagers =  df.where('age >= 10).select('name)
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

    sc.stop()
  }
}


方式二:

import org.apache.spark.sql.SQLContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.types.StringType
import org.apache.spark.sql.Row
//通過可程式設計介面,定義Schema,並應用到RDD上(createDataFrame 方式)
object SparkSqlDemo2 {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("sparksqldemo2").setMaster("local")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    val schemaString = "name age"
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    val rowRDD = sc.textFile("test.txt").map(_.split(" ")).map(p => Row(p(0), p(1).trim))
    val peopleDF = sqlContext.createDataFrame(rowRDD, schema)
    peopleDF.registerTempTable("people")
    sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
    .map(t => "Name: " + t(0)).collect().foreach(println)
  }
}