Spark SQL原理與DataFrame、DataSet相關API操作以及程式碼介紹
阿新 • • 發佈:2019-02-13
//一. DataFrame建立// 1.json檔案// val df = sqlContext.read.json("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json")// val df = sqlContext.read.format("json").load("G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json") // sqlContext.read.format("json").load("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.json") // 2.parquet檔案// val df = sqlContext.read.parquet("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\users.parquet")// 3.jdbc方式建立// val props = new Properties()// props.put("user","root")// props.put("password","123456")// val df = sqlContext.read.jdbc("jdbc:mysql://hdp1:3306/spark","student",props) // 4.通過表建立// df.registerTempTable("student")// var sql = sqlContext.sql("select * from student")// sql.printSchema()// sql.show()// 5.avro檔案建立// import com.databricks.spark.avro._// val df = sqlContext.read.avro("D:/code/spark_code/course/data/users.avro")// 6.通過RDD的方式// 6.1反射方式建立DataFrame// import sqlContext.implicits._ // var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")// .map { line =>// val strs = line.split(",")// Person(strs(0), strs(1).trim.toInt)// }// val df = rdd.toDF()// 6.2註冊元資料方法// var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")// .map { line =>// val strs = line.split(",")// Row(strs(0), strs(1).trim.toInt)// }// var structType = StructType(Array(// StructField("name", DataTypes.StringType),// StructField("age", DataTypes.IntegerType)// ))// val df = sqlContext.createDataFrame(rdd,structType) //df操作// df.printSchema() //列印對應的約束資訊// df.show() //小資料量時候,客戶端顯示資料// val arrs = df.collect()// val list = df.collectAsList()// for(i <- 0 until list.size()){// println(list.get(i))// }// for(ele <- list){// println(ele)// }// println(df.count())// println(df.describe("name","age"))// println(df.first())// for(ele <- df.head(2) ){// println(ele)// }// for(ele <- df.take(1)){// println(ele)// }// for(ele <- df.columns){ // println(ele) // }// println(df.schema)// println(df.select("age").explain())// 條件過濾// println(df.filter(df.col("age").gt(20)).first())// println(df.filter(df.col("age") > 20).first())// println(df.agg(("name" -> "count")).first())// println(df.groupBy("name").count())// df.registerTempTable("people")// println(sqlContext.sql("select * from people where age > 20").first())// 第二部分:Dataset// 1.Dataset的建立// import sqlContext.implicits._// var dS = List(Person("Kevin",24),Person("Jhon",20)).toDS()// val list = List(Person("Kevin",24),Person("Jhon",20))// val frame = sqlContext.createDataFrame(list)// frame.printSchema()// var rdd = sc.textFile("file:\\G:\\code\\source_code\\spark\\examples\\src\\main\\resources\\people.txt")// .map { line =>// val strs = line.split(",")// Person(strs(0), strs(1).trim.toInt)// }// var dS = rdd.toDS()// dS.printSchema()