1. 程式人生 > >spark複習筆記(7):sparkSQL

spark複習筆記(7):sparkSQL

一、saprkSQL模組,使用類sql的方式訪問Hadoop,實現mr計算,底層使用的是rdd

  1.hive      //hadoop  mr  sql

  2.phenoix    //hbase上構建sql的互動過程

  該模組能在spark上執行sql語句

  3.DataFrame    //資料框,表

  在spark中的資料框,sparkSQL就能以完全分散式的方式來處理資料。組合資料框可以來自各種資料來源來進行查詢的處理

  4.SparkSQL    //SQL  |  DataFrame API

  5.RDD[Customer]===>

    $scala>df=sc.createDataFrame(rdd);

$scala>df = sc.createDataFrame(rdd);
//建立樣例類 $scala
>case class Customer1(id:Int,name:String,age:Int)
//構造資料
$scala>val arr = Array("1,tom,12","2,tomas,13","3,tomasLee,14")
$scala>val rdd1 = sc.makeRDD(arr)
//建立物件RDD
$scala>val rdd2=rdd1.map(e=>{
  val arr= e.split(",");
  Customer1(arr(0).toInt,arr(1),arr(2).toInt)
})
//建立customer的rdd,通過rdd建立資料框
$scala>val df = spark.createDataFrame(rdd2)
//打印表結構
//建立臨時檢視
$scala>df.createTempView("customers")
//打印表結構
$scala>df.printSchema
$scala>df.show    //等價於查詢資料
//建立臨時檢視
$scala>df.createTempView("customers")
//使用sparkSQL來進行相關的查詢
val df2 = spark.sql("select * from customers")
//將上述結果進行相關的顯示
df2.show
//帶條件進行相關的查詢
val df2 = spark.sql("select * from customers where id<2")
df2.show
//或者用如下的方式直接show
spark.sql("select * from customer").show  
val df1 = spark.sql("select * from customer where id<2")

val df2 = spark.sql("select * from customers where id>2")

df1.show

df2.show

df.create

df1.createTempView("c1")

df2.createTempView("c2")

val dff = spark.sql("select * from c1 union select * from c2")

dff.show      //顯示前面查詢的結果


$scala>spark.sql("select * from c1 from union select *from c2").show

df1.union(df2).show

spark.sql("select count(*) from customer").show

spark.sql("select * from customer limit 1").show

spark.sql("select *from customers where name like 't%' order by name desc").show
//對映聚合操作
df.map(_.getAs[Int]("age")).reduce(_ + _)

//聚合函式
df.agg(sum("age"),max("age"),min("age"))

  sparkQSL :使用類似SQL方式訪問hadoop,實現MR計算。RDD

  df= sc.createDataFrame(rdd);

  DataSet<Row> ===DataFrame===>//類似於table操作

儲存spark的sql計算結果(json)

  JavaRDD<Row> rdd = df1.toJava();

儲存spark的sql計算結果(json)

  //儲存成json檔案。

  df.write().json(dir)  //這個地方寫的是資料夾,就是儲存檔案的上級目錄

  //設定儲存模式

  df.mode(SaveMode.APPEND);

 

json檔案的讀寫

---------------------------------

  SparkSession.read().json("")  //讀取json檔案形成資料框

  //將資料框的資料寫入json檔案

  SparkSession.write().json("........")  //將資料框的資料寫成json檔案

 

SparkDataFrame以jdbc的方式操縱表

 

SparkDataFrame以jdbc的方式來操縱表

  1.引入mysql驅動

    pom.xml直接修改

spark整合Hive

  1.hive的類庫需要在spark的worker節點,他們也需要通過類庫來訪問hive

  2.複製core-site.xml(hdfs) + hdfs-site.xml + hive-site.xml(hive)這三個檔案複製到spark/conf目錄下面

  3.指定hive的home目錄環境變數

  4.賦值mysql驅動序列到/soft/spark/jars目錄下面 

  5.啟動spark-shell,指定啟動模式

    spark-shell --master local[4]

    create table tt(id int,anme string,age int)

    row format delimited fields terminated by ','

    lines terminated by '\n' stored as textfile;