1. 程式人生 > >4 步搞定 Hive 增量更新

4 步搞定 Hive 增量更新

Hive 的更新很有趣。

Hive 的表有兩種,一種是 managed table, 一種是 external table.

managed table 是 Hive 自動幫我們維護的表,自動分割底層儲存檔案,自動分割槽,這些自動化的操作,都是 Hive 封裝了與 Hadoop 互動的介面。

external table 只是一種在 Hive 維護的與外部檔案的對映。

managed table 與 external table 最大的區別在於刪除的時候,external table 預設情況下只是刪除表定義,而資料依舊在hadoop 上儲存著;managed table 則是表定義連著表資料一起被刪除了。

早期的時候, Hive 支援的表操作只有兩種:OverWrite 和 Appand

Overwrite 並不是對某一行的資料做更新,而是對整張表做覆蓋,所以感覺上 Hive 更像是在做 ETL 裡面的 Staging, 而不像是最終儲存計算結果的地方。Hive 超強的計算能力可以做為大資料量轉換的工具,最終結果將被送到關係型資料庫或者其他 Hive 例項上儲存。

hortonworks 有一篇提出相關解決方案的文章,介紹了 4步走解決增量更新 Hive 表:

url如下:

  1. Ingest

  2. Reconcile

  3. Compact

  4. Purge

過程中,用到了四個 Hive 表,分別是:

base_table: 初始化裝載源庫來的表資料,表示最新資料

incremental_table:用來裝載上一次增量更新以來,發生過更改的資料,包括新增,更新,和刪除

reconcile_view:以 base_table, incremental_table 計算出來的最新資料,涉及到的操作,有刪除,更新,和新增。每一次都要重複計算是不是有些多餘,浪費很多對沒有變更的資料的重複計算。如果有對資料有分割槽,只要對有資料更新的分割槽做增量更新,會有很大效率的提高。

reporting_table:將reconcile_view的資料,裝載到 reporting_table中,用它來替換掉 base_table中的資料。

一) 取決於源資料庫的服務是否支援直連抽取資料,可以有兩種方法完成第一步 ingest, 即 Extract.

  1. File Processing: 由源資料庫自發的輸出,以檔案方式在合理的時間視窗匯出

  2. RDBMS Processing (Database Client based ETL): 由 Sqoop 來完成抽取; ETL 工具, kettle, Informatica等;

File Processing :

  1. 由資料庫軟體自帶的匯入匯出,將檔案匯出一定分隔符分割的文字檔案

  2. 將這些文字檔案放到 Hive 對映的資料夾下面

RDBMS Processing (Database Client based ETL):

  1. SQOOP: 既可以實現初始化匯入,也可以完成增量匯入,增量匯入的實現,依賴於Sqoop 本身的 check-sum 機制。check-sum 是對 Hive 表中的一行用來做校驗資料做了 hash 計算,根據匹配是否來做增量更新。

以下是文章的原文,展示了 Sqoop 的具體用法:

SQOOP is the JDBC-based utility for integrating with traditional
databases.

A SQOOP Import allows for the movement of data into either HDFS (a
delimited format can be defined as part of the Import definition) or
directly into a Hive table.

The entire source table can be moved into HDFS or Hive using the
“–table” parameter.

sqoop import

--connect jdbc:teradata://{host name or ip address}/Database=retail

--connection-manager org.apache.sqoop.teradata.TeradataConnManager

--username dbc

--password dbc

--table SOURCE_TBL

--target-dir /user/hive/incremental_table -m 1

注**

–table source_TBL: 是指關係型資料庫裡的原表

–target-dir :Hive 中表對應的儲存目錄

After the initial import, subsequent imports can leverage SQOOP’s native support for “Incremental Import” by using the “check-column”, “incremental” and “last-value” parameters.

sqoop import

--connect jdbc:teradata://{host name or ip address}/Database=retail

--connection-manager org.apache.sqoop.teradata.TeradataConnManager

--username dbc

--password dbc

--table SOURCE_TBL

--target-dir /user/hive/incremental_table -m 1

--check-column modified_date

--incremental lastmodified

--last-value {last_import_date|last_import_value}

注**

–check-column : 是指定原表中用來做增量判斷條件的那一欄位

–incremental lastmodified: 指定增量的模式,append 或者 lastmodified.

在資料倉庫中,無論是維度表還是事實表,我們總會設計一欄自增列,作為代理鍵或者主鍵。這個時候這些鍵值總是自增長的,因此適合採用 append 形式,指定check-sum 列為自增列,如果有比 {last_import_value}大的值,就會被 sqoop 匯入進來;

在設計資料庫的時候,為了審計,我們通常也會設計一列為 timestamp 列,每對一行做了修改,就會重置這列 timestamp 為當前時間戳。如果是針對這類行資料,我們要指定的便是 lastmodified, 配合 check-sum 設定為 timestamp 列,sqoop 就會匯入比{last_import_date} 大的資料行。

這裡寫圖片描述

–last-value { last_import_date } 這是需要從程式外面傳進來的

考慮到這是增量更新,那麼理應把 sqoop 做成一個 Job 來自動化執行,並且記錄每一次的時間,作為下次執行時要傳入的 {last_import_date} 或者{last_import_value}

Alternately, you can leverage the “query” parameter, and have SQL select statements limit the import to new or changed records only.

sqoop import

--connect jdbc:teradata://{host name or ip address}/Database=retail

--connection-manager org.apache.sqoop.teradata.TeradataConnManager

--username dbc

--password dbc

--target-dir /user/hive/incremental_table -m 1

--query 'select * from SOURCE_TBL where modified_date > {last_import_date} AND $CONDITIONS’

Note: For the initial load, substitute “base_table” for “incremental_table”. For all subsequent loads, use “incremental_table”.

注**

這是前面兩種全量和增量的替代寫法,用指定的查詢,從原關係型資料庫匯出資料,不同的是,全量的時候,要指定匯入的 Hive 目標表是 base_table, 而增量的時候,匯入的是 incremental_table.

二) Reconciliation 將新舊資料綜合起來

初始化時,裝載最終的目標表沒有多少難度。

在這段中,主要解決的問題是增量與初始化的融合。

初始化的資料,儲存在 base_table 中, 而增量資料我們已經裝載到了 incremental_table 中。

將兩者的資料合二為一,就可以生成與源資料庫一致的最新資料。

前提是源資料庫的任何資料行不接受硬刪除即delete 操作,而是在行上打了一個軟刪除的標籤,表示該行已刪除。

如果是做了硬刪除,那麼同時也要做好刪除的審計,將刪除的資料行放入審計表中,一同傳送給 incremental_table .

base_table

CREATE TABLE base_table (

id string,

field1 string,

field2 string,

field3 string,

field4 string,

field5 string,

modified_date string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ','

LOCATION '/user/hive/base_table';

incremental_table

CREATE EXTERNAL TABLE incremental_table (

id string,

field1 string,

field2 string,

field3 string,

field4 string,

field5 string,

modified_date string

)

ROW FORMAT DELIMITED

FIELDS TERMINATED BY ','

LOCATION '/user/hive/incremental_table';

reconcile_view

CREATE VIEW reconcile_view AS

SELECT t2.id, t2.field1, t2.field2, t2.field3, t2.field4, t2.field5, t2.modified_date FROM

(SELECT *,ROW_NUMBER() OVER (PARTITION BY id ORDER BY modified_date DESC) rn

FROM (SELECT * FROM base_table

UNION ALL

SELECT * FROM incremental_table)

t1) t2

WHERE rn = 1;

從最後一個view定義來解說,incremental_table 必須擁有增量記錄的全部,因此硬刪除操作就不會反應在 incremental_table 裡頭。

但是 reconcile_view 所涉及的量畢竟有限,浪費明明不會更改的那部分資料的計算。

因此如果能做好分割槽,僅僅對某幾個分割槽做全量更新會更高效。

三) Compact: 物化檢視,即將reconciliation_view 裝載到 reporting_table 裡面去

reporting_table

DROP TABLE reporting_table;

CREATE TABLE reporting_table AS

SELECT * FROM reconcile_view;

首先是要將之前的 reporting_table 刪除,再重建 reporting _table, 用 reconciliation_view 填充這張表。

在這張表的基礎上,可以做很多聚合,過濾等操作,進行資料二次加工。

四) Purge: 將多餘的表資料清空

base_table :應該當換成 reporting_table 裡面的資料

incremental_table: 清空

DROP TABLE base_table;

CREATE TABLE base_table AS

SELECT * FROM reporting_table;

hadoop fs –rm –r /user/hive/incremental_table/*

總結:

  1. Oozie 可以將這4步統一做成一個工作流,方便排程

  2. 可以用指令碼自定義工作流,就像資料倉庫的 ETL 一樣