1. 程式人生 > >Spark SQL原理與DataFrame、DataSet相關API操作以及程式碼介紹

Spark SQL原理與DataFrame、DataSet相關API操作以及程式碼介紹

//一. DataFrame建立//    1.json檔案//    val df = sqlContext.read.json("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json")//    val df = sqlContext.read.format("json").load("G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json")    //    sqlContext.read.format("json").load("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json")
    //  2.parquet檔案//      val df = sqlContext.read.parquet("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\users.parquet")//      3.jdbc方式建立//      val props = new Properties()//      props.put("user","root")//      props.put("password","123456")//     val df = sqlContext.read.jdbc("jdbc:mysql://hdp1:3306/spark","student",props)
//      4.通過表建立//    df.registerTempTable("student")//    var sql = sqlContext.sql("select * from student")//    sql.printSchema()//    sql.show()//    5.avro檔案建立//    import com.databricks.spark.avro._//    val df = sqlContext.read.avro("D:/code/spark_code/course/data/users.avro")//    6.通過RDD的方式//    6.1反射方式建立DataFrame//    import sqlContext.implicits._
//    var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")//      .map { line =>//        val strs = line.split(",")//        Person(strs(0), strs(1).trim.toInt)//      }//    val df = rdd.toDF()//    6.2註冊元資料方法//      var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")//          .map { line =>//            val strs = line.split(",")//            Row(strs(0), strs(1).trim.toInt)//          }//    var structType = StructType(Array(//      StructField("name", DataTypes.StringType),//      StructField("age", DataTypes.IntegerType)//    ))//      val df = sqlContext.createDataFrame(rdd,structType)    //df操作//    df.printSchema()   //列印對應的約束資訊//    df.show()         //小資料量時候,客戶端顯示資料//    val arrs = df.collect()//    val list = df.collectAsList()//    for(i <- 0 until list.size()){//      println(list.get(i))//    }//    for(ele <- list){//      println(ele)//    }//  println(df.count())//    println(df.describe("name","age"))//    println(df.first())//    for(ele <- df.head(2) ){//      println(ele)//    }//    for(ele <- df.take(1)){//      println(ele)//    }//    for(ele <- df.columns){    //      println(ele)    //    }//    println(df.schema)//    println(df.select("age").explain())//    條件過濾//    println(df.filter(df.col("age").gt(20)).first())//    println(df.filter(df.col("age") > 20).first())//    println(df.agg(("name" -> "count")).first())//      println(df.groupBy("name").count())//    df.registerTempTable("people")//    println(sqlContext.sql("select * from people where age > 20").first())//    第二部分:Dataset//      1.Dataset的建立//    import sqlContext.implicits._//    var dS = List(Person("Kevin",24),Person("Jhon",20)).toDS()//    val list = List(Person("Kevin",24),Person("Jhon",20))//    val frame = sqlContext.createDataFrame(list)//    frame.printSchema()//      var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")//        .map { line =>//          val strs = line.split(",")//          Person(strs(0), strs(1).trim.toInt)//        }//      var dS = rdd.toDS()//      dS.printSchema()