1. 程式人生 > >Hive-On-Spark

Hive-On-Spark

1 HiveOnSpark簡介

Hive On Spark (跟hive沒太大的關係,就是使用了hive的標準(HQL, 元資料庫、UDF、序列化、反序列化機制))

Hive原來的計算模型是MR,有點慢(將中間結果寫入到HDFS中)

Hive On Spark 使用RDD(DataFrame),然後執行在spark 叢集上

真正要計算的資料是儲存在HDFS中,mysql這個元資料庫,儲存的是hive表的描述資訊,描述了有哪些database、table、以及表有多少列,每一列是什麼型別,還要描述表的資料儲存在hdfs的什麼位置?

 

hive跟mysql的區別?

hive是一個數據倉庫(儲存資料並分析資料,分析資料倉庫中的資料量很大,一般要分析很長的時間)

mysql是一個關係型資料庫(關係型資料的增刪改查(低延遲))

 

hive的元資料庫中儲存要計算的資料嗎?

不儲存,儲存hive倉庫的表、欄位、等描述資訊

 

真正要計算的資料儲存在哪裡了?

儲存在HDFS中了

 

hive的元資料庫的功能

建立了一種對映關係,執行HQL時,先到MySQL元資料庫中查詢描述資訊,然後根據描述資訊生成任務,然後將任務下發到spark叢集中執行

 

hive  on spark  使用的僅僅是hive的標準,規範,不需要有hive資料庫一樣可行。

hive : 元資料,是存放在mysql中,然後真正的資料是存放在hdfs中。

2 安裝mysql

mysql資料庫作為hive使用的元資料

3 配置HiveOnSpark

生成hive的元資料庫表,根據hive的配置檔案,生成對應的元資料庫表。

 

spark-sql 是spark專門用於編寫sql的互動式命令列。

當直接啟動spark-sql以local模式執行時,如果報錯:

是因為配置了Hadoop的配置引數導致的:

執行測試命令:

create table test (name string);

insert into test values(“xxtest”);

local模式下,預設使用derby資料庫,資料儲存於本地位置。

要想使用hive的標準,需要把hive的配置檔案放到spark的conf目錄下

cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/

vi hive-site.xml

 

hive-site.xml檔案:

<configuration>
    <property>
        <name>javax.jdo.option.ConnectionURL</name>
        <value>jdbc:mysql://hdp-01:3306/hive?createDatabaseIfNotExist=true</value>
        <description>JDBC connect string for a JDBC metastore</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionDriverName</name>
        <value>com.mysql.jdbc.Driver</value>
        <description>Driver class name for a JDBC metastore</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionUserName</name>
        <value>root</value>
        <description>username to use against metastore database</description>
    </property>

    <property>
        <name>javax.jdo.option.ConnectionPassword</name>
        <value>123456</value>
        <description>password to use against metastore database</description>
    </property>
</configuration>

把該配置檔案,傳送給叢集中的其他節點:

cd /root/apps/spark-2.2.0-bin-hadoop2.7/conf/

for i in 2 3 ;do scp hive-site.xml hdp-0$i:`pwd` ;done

重新停止並重啟spark: start-all.sh

啟動spark-sql時,

出現如下錯誤是因為操作mysql時缺少mysql的驅動jar包,

解決方案1:--jars 或者 --driver-class-path  引入msyql的jar包

解決方案2: 把mysql的jar包新增到$spark_home/jars目錄下

啟動時指定叢集:(如果不指定master,預設就是local模式)

spark-sql --master spark://hdp-01:7077  --jars /root/mysql-connector-java-5.1.38.jar

sparkSQL會在mysql上建立一個database,需要手動改一下DBS表中的DB_LOCATION_UIR改成hdfs的地址

hdfs://hdp-01:9000/user/hive/spark-warehouse

 

也需要檢視一下,自己建立的資料庫表的儲存路徑是否是hdfs的目錄。

執行spark-sql任務之後:可以在叢集的監控介面檢視

同樣 ,會有SparkSubmit程序存在。

 

4 IDEA程式設計

要先開啟spark對hive的支援

//如果想讓hive執行在spark上,一定要開啟spark對hive的支援
val session = SparkSession.builder()
  .master("local")
  .appName("xx")
  .enableHiveSupport() // 啟動對hive的支援, 還需新增支援jar包
  .getOrCreate()

要新增spark對hive的相容jar包

<!--sparksql對hive的支援-->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-hive_2.11</artifactId>
    <version>${spark.version}</version>
</dependency>

在本地執行,還需把hive-site.xml檔案拷貝到resource目錄下。

resources目錄,存放著當前專案的配置檔案

編寫程式碼,local模式下測試:

// 執行查詢
val query = session.sql("select * from t_access_times")
query.show()
// 釋放資源
session.close()

建立表的時候,需要偽裝客戶端身份

System.setProperty("HADOOP_USER_NAME", "root") // 偽裝客戶端的使用者身份為root
//  或者新增執行引數 –DHADOOP_USER_NAME=root

 

基本操作

  // 求每個使用者的每月總金額
    //    session.sql("select username,month,sum(salary) as salary from t_access_times group by username,month")
    // 建立表
    //    session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")

    // 刪除表
    //    session.sql("drop table t_access1")

    // 插入資料
    //    session.sql("insert into  t_access1  select * from t_access_times")
    //    .show()
    // 覆蓋寫資料
    //    session.sql("insert overwrite table  t_access1  select * from t_access_times where username='A'")

    // 覆蓋load新資料
    //    C,2015-01,10
    //    C,2015-01,20
    //    session.sql("load data local inpath 't_access_time_log' overwrite into table t_access1")

    // 清空資料
    //    session.sql("truncate table t_access1")

    //      .show()

    // 寫入自定義資料
    val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))

    val accessdf = access.map({
      t =>
        val lines = t.split(",")
        (lines(0), lines(1), lines(2).toInt)
    }).toDF("username", "month", "salary")

    //    .show()

    accessdf.createTempView("t_ac")
    //    session.sql("insert into t_access1 select * from t_ac")

    // overwrite模式會重新建立新的表 根據指定schema資訊   SaveMode.Overwrite
    // 本地模式只支援 overwrite,必須在sparksession上新增配置引數:
//     .config("spark.sql.warehouse.dir", "hdfs://hdp-01:9000/user/hive/warehouse")
    accessdf
      .write.mode("overwrite").saveAsTable("t_access1")

 

叢集執行:

需要把hive-site.xml配置檔案,新增到$SPARK_HOME/conf目錄中去,重啟spark

上傳一個mysql連線驅動(sparkSubmit也要連線MySQL,獲取元資料資訊)

spark-sql --master spark://hdp-01:7077 --driver-class-path /root/mysql-connector-java-5.1.38.jar

--class   xx.jar

 

然後執行程式碼的編寫:

  // 執行查詢 hive的資料表
//    session.sql("select * from t_access_times")
//      .show()

    // 建立表
//    session.sql("create table t_access1(username string,month string,salary int) row format delimited fields terminated by ','")


//      session.sql("insert into t_access1 select * from t_access_times")
//    .show()

    // 寫資料
    val access: Dataset[String] = session.createDataset(List("b,2015-01,10", "c,2015-02,20"))

    val accessdf = access.map({
      t =>
        val lines = t.split(",")
        (lines(0), lines(1), lines(2).toInt)
    }).toDF("username", "month", "salary")


    accessdf.createTempView("v_tmp")
    // 插入資料
//    session.sql("insert overwrite table t_access1 select * from v_tmp")
    session.sql("insert into t_access1 select * from v_tmp")
//    .show()

// insertInto的api  入庫
accessdf.write.insertInto("databaseName.tableName")
    session.close()