1. 程式人生 > >Spark學習(肆)- 從Hive平滑過渡到Spark SQL

Spark學習(肆)- 從Hive平滑過渡到Spark SQL

文章目錄

SQLContext的使用

Spark1.x中Spark SQL的入口點: SQLContext

val sc: SparkContext // An existing SparkContext.
val sqlContext =
new org.apache.spark.sql.SQLContext(sc) // this is used to implicitly convert an RDD to a DataFrame. import sqlContext.implicits._

建立一個scala maven專案
SQLContext測試:
新增相關pom依賴

<properties>
   <scala.version>2.11.8</scala.version>
   <spark.version>2.1.0</spark.version>
</
properties
>
<dependencies> <!--scala--> <dependency> <groupId>org.scala-lang</groupId> <artifactId>scala-library</artifactId> <version>${scala.version}</version> </dependency> <!--SparkSQL-->
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>${spark.version}</version> </dependency> </dependencies>

測試資料:

{“name”: “zhangsan”, “age”:30}
{"name ": “Michael”}
{“name” : “Andy”, “age”:30}
{“name” : “Justin”, “age” :19}

package com.imooc.spark

import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext

import org.apache.spark.SparkConf

/**
 * SQLContext的使用:
 * 注意:IDEA是在本地,而測試資料是在伺服器上 ,能不能在本地進行開發測試的?
 *
 */
object SQLContextApp {

  def main(args: Array[String]) {

    val path = args(0)

    //1)建立相應的Context
    val sparkConf = new SparkConf()

    //在測試叢集或者生產叢集中,AppName和Master我們是通過指令碼進行指定
    //sparkConf.setAppName("SQLContextApp").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)

    //2)相關的處理: json
    val people = sqlContext.read.format("json").load(path)
    people.printSchema()
    people.show()

    //3)關閉資源
    sc.stop()
  }

}

本地測試:
在這裡插入圖片描述
列印結果是一個表結構

age name
30 zhangsan
null Michaeti
30 Andy
19 Justin

叢集測試:
1)編譯:mvn clean package -DskipTests
2)上傳到叢集

提交Spark Application到環境中執行
spark-submit
–name SQLContextApp
–class com.kun.sparksql.SQLContextApp
–master local[2]
/home/hadoop/lib/sql-1.0.jar
/home/hadoop/app/spark-2.1.0-bin-2.6.0-cdh5.7.0/examples/src/main/resources/people.json

結果同上本地測試結果

HiveContext的使用

Spark1.x中Spark SQL的入口點: HiveContext
要使用HiveContext,只需要有一個hive-site.xml
將Hive的配置檔案hive-site.xml拷貝到${spark_home}/conf目錄下

val sc: SparkContext // An existing SparkContext.
val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)

新增maven依賴

    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-hive_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>

測試:

package com.kun.sparksql

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}

/**
 * HiveContext的使用 (展示hive裡emp表的內容)
 * 使用時需要通過--jars 把mysql的驅動傳遞到classpath
 */
object HiveContextApp {

  def main(args: Array[String]) {
    //1)建立相應的Context
    val sparkConf = new SparkConf()

    //在測試或者生產中,AppName和Master我們是通過指令碼進行指定
    //sparkConf.setAppName("HiveContextApp").setMaster("local[2]")

    val sc = new SparkContext(sparkConf)
    val hiveContext = new HiveContext(sc)

    //2)相關的處理:
    hiveContext.table("emp").show

    //3)關閉資源
    sc.stop()
  }
}

本地測試和叢集測試步驟同SQLContext的使用雷同

提交Spark Application到環境中執行
spark-submit
–name HiveContextApp
–class com.kun.sparksql.HiveContextApp
–master local[2]
/home/hadoop/lib/sql-1.0.jar
–jar /mysql驅動包路徑

SparkSession的使用

Spark2.x中Spark SQL的入口點: SparkSession

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

測試:

package com.kun.sparksql

import org.apache.spark.sql.SparkSession

/**
 * SparkSession的使用
 */
object SparkSessionApp {

  def main(args: Array[String]) {

    val spark = SparkSession.builder().appName("SparkSessionApp")
      .master("local[2]").getOrCreate()

    val people = spark.read.json("file:///Users/rocky/data/people.json")
    people.show()

    spark.stop()
  }
}

spark-shell&spark-sql的使用

檢查程序:
在這裡插入圖片描述

spark-shell

進入hive裡:
在這裡插入圖片描述
使用 spark-shell --master local[2] 命令來啟動spark-shell
沒有配置hive-site.xml前spark裡是空表
在這裡插入圖片描述
將Hive的配置檔案hive-site.xml拷貝到${spark_home}/conf目錄下;重新啟動spark-shell,啟動時需要通過–jars 把mysql的驅動傳遞到classpath

檢視hive裡的表:
在這裡插入圖片描述
訪問dept表:
在這裡插入圖片描述
使用相同的查詢語句;hive會進入mr作業;spark會直接出資料
在這裡插入圖片描述
在這裡插入圖片描述
在這裡插入圖片描述

spark-sql

啟動sparksql
在這裡插入圖片描述
在這裡插入圖片描述
執行一條查詢語句:

select * form emp;

在這裡插入圖片描述
此時重新整理spark ui頁面會有作業job輸出

檢視spark sql執行計劃(sql語句是任意寫的;只是為了檢視執行計劃)

create table t(key string, value string);
explain extended select a.key*(2+3), b.value from  t a join t b on a.key = b.key and a.key > 3;

執行計劃:

== Parsed Logical Plan ==
'Project [unresolvedalias(('a.key * (2 + 3)), None), 'b.value]
+- 'Join Inner, (('a.key = 'b.key) && ('a.key > 3))
   :- 'UnresolvedRelation `t`, a
   +- 'UnresolvedRelation `t`, b

== Analyzed Logical Plan ==
(CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE)): double, value: string
Project [(cast(key#321 as double) * cast((2 + 3) as double)) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324]
+- Join Inner, ((key#321 = key#323) && (cast(key#321 as double) > cast(3 as double)))
   :- SubqueryAlias a
   :  +- MetastoreRelation default, t
   +- SubqueryAlias b
      +- MetastoreRelation default, t

== Optimized Logical Plan ==
Project [(cast(key#321 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324]
+- Join Inner, (key#321 = key#323)
   :- Project [key#321]
   :  +- Filter (isnotnull(key#321) && (cast(key#321 as double) > 3.0))
   :     +- MetastoreRelation default, t
   +- Filter (isnotnull(key#323) && (cast(key#323 as double) > 3.0))
      +- MetastoreRelation default, t

== Physical Plan ==
*Project [(cast(key#321 as double) * 5.0) AS (CAST(key AS DOUBLE) * CAST((2 + 3) AS DOUBLE))#325, value#324]
+- *SortMergeJoin [key#321], [key#323], Inner
   :- *Sort [key#321 ASC NULLS FIRST], false, 0
   :  +- Exchange hashpartitioning(key#321, 200)
   :     +- *Filter (isnotnull(key#321) && (cast(key#321 as double) > 3.0))
   :        +- HiveTableScan [key#321], MetastoreRelation default, t
   +- *Sort [key#323 ASC NULLS FIRST], false, 0
      +- Exchange hashpartitioning(key#323, 200)
         +- *Filter (isnotnull(key#323) && (cast(key#323 as double) > 3.0))
            +- HiveTableScan [key#323, value#324], MetastoreRelation default, t

thriftserver&beeline的使用

進入${Spark_home}/sbin下面;啟動thriftserver:
start-thriftserver.sh --master local[2] --jars mysql驅動包路徑
檢視是否啟動完成
在這裡插入圖片描述
在這裡插入圖片描述

  1. 啟動thriftserver: 預設埠是10000 ,可以修改

修改thriftserver啟動佔用的預設埠號:
./start-thriftserver.sh
–master local[2]
–jars ~/software/mysql-connector-java-5.1.27-bin.jar
–hiveconf hive.server2.thrift.port=14000
啟動thriftserver後;beeline的對應埠也要修改
beeline -u jdbc:hive2://localhost:14000 -n hadoop

2)啟動beeline(${spark_home}/bin/beeline)(可以啟動多個beenline客戶端)

beeline -u jdbc:hive2://localhost:10000 -n hadoop

在這裡插入圖片描述
執行的sql可以在介面JDBC/ODBC Server介面檢視

thriftserver和普通的spark-shell/spark-sql有什麼區別?

1)spark-shell、spark-sql都是一個spark application;
2)thriftserver, 不管你啟動多少個客戶端(beeline/code),永遠都是一個spark application
解決了一個數據共享的問題,多個客戶端可以共享資料;

jdbc方式程式設計訪問

注意事項:在使用jdbc開發時,一定要先啟動thriftserver

Exception in thread “main” java.sql.SQLException:
Could not open client transport with JDBC Uri: jdbc:hive2://hadoop001:14000:
java.net.ConnectException: Connection refused

新增maven依賴

 <dependency>
      <groupId>org.spark-project.hive</groupId>
      <artifactId>hive-jdbc</artifactId>
      <version>1.2.1.spark2</version>
  </dependency>
package com.kun.sparksql

import java.sql.DriverManager

/**
 *  通過JDBC的方式訪問
 */
object SparkSQLThriftServerApp {

  def main(args: Array[String]) {

    Class.forName("org.apache.hive.jdbc.HiveDriver")

    val conn = DriverManager.getConnection("jdbc:hive2://hadoop001:14000","hadoop","")
    val pstmt = conn.prepareStatement("select empno, ename, sal from emp")
    val rs = pstmt.executeQuery()
    while (rs.next()) {
      println("empno:" + rs.getInt("empno") +
        " , ename:" + rs.getString("ename") +
        " , sal:" + rs.getDouble("sal"))

    }

    rs.close()
    pstmt.close()
    conn.close()
  }
}