1. 程式人生 > >基於hive的拉鏈表設計實現

基於hive的拉鏈表設計實現

show htm schema ive when form put 增量 上傳

參考http://lxw1234.com/archives/2015/08/473.htm

測試數據

order_2015-08-21

1 2015-08-18 2015-08-18 創建
2 2015-08-18 2015-08-18 創建
3 2015-08-19 2015-08-21 支付
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-20 支付
6 2015-08-20 2015-08-20 創建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-21 創建

order_2015-08-22

1 2015-08-18 2015-08-22 創建
2 2015-08-18 2015-08-22 創建
3 2015-08-19 2015-08-21 支付
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-20 支付
6 2015-08-20 2015-08-22 創建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-22 創建
9 2015-08-22 2015-08-22 創建
10 2015-08-22 2015-08-22 支付

order_2015-08-23

1 2015-08-18 2015-08-23 完成
2 2015-08-18 2015-08-22 創建
3 2015-08-19 2015-08-23 完成
4 2015-08-19 2015-08-21 完成
5 2015-08-19 2015-08-23 完成
6 2015-08-20 2015-08-22 創建
7 2015-08-20 2015-08-21 支付
8 2015-08-21 2015-08-23 完成
9 2015-08-22 2015-08-22 創建
10 2015-08-22 2015-08-22 支付
11 2015-08-23 2015-08-23 創建
12 2015-08-23 2015-08-23 創建
13 2015-08-23 2015-08-23 支付

-------------------------------------------------------------------------

步驟

--建立目錄
hdfs dfs -mkdir /user/hive_remote/warehouse/demoData
--上傳測試數據到本地
rz--選擇文件order_2015-08-21.txt
rz--選擇文件order_2015-08-22.txt
rz--選擇文件order_2015-08-23.txt
--上傳測試數據到hdfs
hdfs dfs -put ./order_2015-08-21.txt /user/hive_remote/warehouse/demoData
hdfs dfs -put ./order_2015-08-22.txt /user/hive_remote/warehouse/demoData
hdfs dfs -put ./order_2015-08-23.txt /user/hive_remote/warehouse/demoData
--啟動hive
./bin/hive
--創建表數據庫demo
DROP DATABASE IF EXISTS demo;
create schema demo;
--使用數據庫demo
use demo;
--------------------------------------
--源系統中訂單表
CREATE external TABLE orders (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) PARTITIONED BY (day STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;
--初始化訂單表order數據
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-21.txt‘ into table orders PARTITION (day = ‘2015-08-21‘);
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-22.txt‘ into table orders PARTITION (day = ‘2015-08-22‘);
load data inpath ‘/user/hive_remote/warehouse/demoData/order_2015-08-23.txt‘ into table orders PARTITION (day = ‘2015-08-23‘);
--驗證導入是否成功
show partitions orders;
------------------------------------------------
select * from orders where day = ‘2015-08-21‘;
select * from orders where day = ‘2015-08-22‘;
select * from orders where day = ‘2015-08-23‘;
-------------------------------------------------
--ODS層,有一張訂單的增量數據表,按天分區,存放每天的增量數據:
DROP TABLE IF EXISTS t_ods_orders_inc;
CREATE external TABLE t_ods_orders_inc (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING
) PARTITIONED BY (day STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;
--DW層,有一張訂單的歷史數據拉鏈表,存放訂單的歷史狀態數據:
DROP TABLE IF EXISTS t_dw_orders_his;
CREATE external TABLE t_dw_orders_his (
orderid INT,
createtime STRING,
modifiedtime STRING,
status STRING,
dw_start_date STRING,
dw_end_date STRING
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ‘\t‘ stored AS textfile;

--全量初始化
--在數據從源業務系統每天正常抽取和刷新到DW訂單歷史表之前,需要做一次全量的初始化,就是從源訂單表中昨天以前的數據全部抽取到ODW,並刷新到DW。

--第一步,抽取全量數據到ODS:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-20‘)
SELECT orderid,createtime,modifiedtime,status
FROM orders
WHERE createtime <= ‘2015-08-20‘ and day = ‘2015-08-21‘;
--驗證抽取是否成功
select * from t_ods_orders_inc where day = ‘2015-08-20‘;
-----------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-20
2 2015-08-18 2015-08-18 創建 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-20
5 2015-08-19 2015-08-20 支付 2015-08-20
6 2015-08-20 2015-08-20 創建 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-20
-----------------------------------------------------------
--第二步,從ODS刷新到DW:
INSERT overwrite TABLE t_dw_orders_his
SELECT orderid,createtime,modifiedtime,status,
createtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-20‘;
--驗證刷新是否成功
select * from t_dw_orders_his;
----------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 9999-12-31
----------------------------------------------------------

--增量抽取
--增量刷新歷史數據
--從2015-08-22開始,需要每天正常刷新前一天(2015-08-21)的增量數據到歷史表。
--將2015-08-21的數據抽取到ODS 2015-08-21:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-21‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE (createtime = ‘2015-08-21‘ OR modifiedtime = ‘2015-08-21‘) and day = ‘2015-08-21‘;
--驗證抽取2015-08-21的增量數據是否成功
select * from t_ods_orders_inc where day = ‘2015-08-21‘;
------------------------------------------------------------------------
3 2015-08-19 2015-08-21 支付 2015-08-21
4 2015-08-19 2015-08-21 完成 2015-08-21
7 2015-08-20 2015-08-21 支付 2015-08-21
8 2015-08-21 2015-08-21 創建 2015-08-21
------------------------------------------------------------------------
--將2015-08-22的數據抽取到ODS 2015-08-22:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-22‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE (createtime = ‘2015-08-22‘ OR modifiedtime = ‘2015-08-22‘) and day = ‘2015-08-22‘;
--驗證抽取2015-08-22的增量數據是否成功
select * from t_ods_orders_inc where day = ‘2015-08-22‘;
-------------------------------------------------------------
1 2015-08-18 2015-08-22 創建 2015-08-22
2 2015-08-18 2015-08-22 創建 2015-08-22
6 2015-08-20 2015-08-22 創建 2015-08-22
8 2015-08-21 2015-08-22 創建 2015-08-22
9 2015-08-22 2015-08-22 創建 2015-08-22
10 2015-08-22 2015-08-22 支付 2015-08-22
-------------------------------------------------------------
--將2015-08-23的數據抽取到ODS 2015-08-23:
INSERT overwrite TABLE t_ods_orders_inc PARTITION (day = ‘2015-08-23‘)
SELECT orderid,createtime,modifiedtime,status FROM orders WHERE day = ‘2015-08-23‘ and (createtime = ‘2015-08-23‘ OR modifiedtime = ‘2015-08-23‘);
--驗證抽取2015-08-22的增量數據是否成功
select * from t_ods_orders_inc where day = ‘2015-08-23‘;
--secc

--通過DW歷史數據(數據日期為2015-08-20),和ODS增量數據(2015-08-21),刷新歷史表:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-21‘ THEN ‘2015-08-20‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-21‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-21‘
) x
ORDER BY orderid,dw_start_date;
--驗證是否成功
select * from t_dw_orders_his_tmp order by orderid,dw_start_date;
-----------------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 創建 2015-08-21 9999-12-31

-----------------------------------------------------------------
--其中:
--UNION ALL的兩個結果集中,第一個是用歷史表left outer join 日期為 ${yyy-MM-dd} 的增量,能關聯上的,並且dw_end_date > ${yyy-MM-dd},說明狀態有變化,則把原來的dw_end_date置為(${yyy-MM-dd} – 1), 俗稱閉鏈 。關聯不上的,說明狀態無變化,dw_end_date無變化。


--第二個結果集是直接將增量數據插入歷史表。

dw_end_date 改為9999-12-31俗稱開鏈

--最後把臨時表中數據插入歷史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;
--驗證是否成功
select * from t_dw_orders_his order by orderid,dw_start_date;
---------------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
2 2015-08-18 2015-08-18 創建 2015-08-18 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 創建 2015-08-21 9999-12-31
---------------------------------------------------------------

------------------------------------------
--將2015-08-22號的增量數據刷新到歷史表中:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-22‘ THEN ‘2015-08-21‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-22‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-22‘
) x
ORDER BY orderid,dw_start_date;

--最後把臨時表中數據插入歷史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;
--驗證刷新是否成功
select * from t_dw_orders_his;
----------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
1 2015-08-18 2015-08-22 創建 2015-08-22 9999-12-31
2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-22 創建 2015-08-22 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 2015-08-21
6 2015-08-20 2015-08-22 創建 2015-08-22 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 創建 2015-08-21 2015-08-21
8 2015-08-21 2015-08-22 創建 2015-08-22 9999-12-31
9 2015-08-22 2015-08-22 創建 2015-08-22 9999-12-31
10 2015-08-22 2015-08-22 支付 2015-08-22 9999-12-31
----------------------------------------------------------
--2015-08-21 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-21‘ and dw_end_date >= ‘2015-08-21‘;
----------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
3 2015-08-19 2015-08-21 支付 2015-08-21 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 2015-08-21
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 創建 2015-08-21 2015-08-21
----------------------------------------------------------
--2015-08-22 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-22‘ and dw_end_date >= ‘2015-08-22‘;
--將2015-08-23號的增量數據刷新到歷史表中:
DROP TABLE IF EXISTS t_dw_orders_his_tmp;
CREATE TABLE t_dw_orders_his_tmp AS
SELECT orderid,
createtime,
modifiedtime,
status,
dw_start_date,
dw_end_date
FROM (
SELECT a.orderid,
a.createtime,
a.modifiedtime,
a.status,
a.dw_start_date,
CASE WHEN b.orderid IS NOT NULL AND a.dw_end_date > ‘2015-08-23‘ THEN ‘2015-08-22‘ ELSE a.dw_end_date END AS dw_end_date
FROM t_dw_orders_his a
left outer join (SELECT * FROM t_ods_orders_inc WHERE day = ‘2015-08-23‘) b
ON (a.orderid = b.orderid)
UNION ALL
SELECT orderid,
createtime,
modifiedtime,
status,
modifiedtime AS dw_start_date,
‘9999-12-31‘ AS dw_end_date
FROM t_ods_orders_inc
WHERE day = ‘2015-08-23‘
) x
ORDER BY orderid,dw_start_date;
--驗證
select * from t_dw_orders_his_tmp order by orderid,dw_start_date;
----------------------------------------------------------
1 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
1 2015-08-18 2015-08-22 創建 2015-08-22 2015-08-22
1 2015-08-18 2015-08-23 完成 2015-08-23 9999-12-31
2 2015-08-18 2015-08-18 創建 2015-08-18 2015-08-21
2 2015-08-18 2015-08-22 創建 2015-08-22 9999-12-31
3 2015-08-19 2015-08-21 支付 2015-08-19 2015-08-20
3 2015-08-19 2015-08-21 支付 2015-08-21 2015-08-22
3 2015-08-19 2015-08-23 完成 2015-08-23 9999-12-31
4 2015-08-19 2015-08-21 完成 2015-08-19 2015-08-20
4 2015-08-19 2015-08-21 完成 2015-08-21 9999-12-31
5 2015-08-19 2015-08-20 支付 2015-08-19 2015-08-22
5 2015-08-19 2015-08-23 完成 2015-08-23 9999-12-31
6 2015-08-20 2015-08-20 創建 2015-08-20 2015-08-21
6 2015-08-20 2015-08-22 創建 2015-08-22 9999-12-31
7 2015-08-20 2015-08-21 支付 2015-08-20 2015-08-20
7 2015-08-20 2015-08-21 支付 2015-08-21 9999-12-31
8 2015-08-21 2015-08-21 創建 2015-08-21 2015-08-21
8 2015-08-21 2015-08-22 創建 2015-08-22 2015-08-22
8 2015-08-21 2015-08-23 完成 2015-08-23 9999-12-31
9 2015-08-22 2015-08-22 創建 2015-08-22 9999-12-31
10 2015-08-22 2015-08-22 支付 2015-08-22 9999-12-31
11 2015-08-23 2015-08-23 創建 2015-08-23 9999-12-31
12 2015-08-23 2015-08-23 創建 2015-08-23 9999-12-31
13 2015-08-23 2015-08-23 支付 2015-08-23 9999-12-31
----------------------------------------------------------
--最後把臨時表中數據插入歷史表:
INSERT overwrite TABLE t_dw_orders_his SELECT * FROM t_dw_orders_his_tmp;

--2015-08-21 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-21‘ and dw_end_date >= ‘2015-08-21‘;

--2015-08-22 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-22‘ and dw_end_date >= ‘2015-08-22‘;

--2015-08-23 的快照
select * from t_dw_orders_his where dw_start_date <= ‘2015-08-23‘ and dw_end_date >= ‘2015-08-23‘;

實戰過程中可以分三層

一層 從上遊系統中原樣抽取到 hive 的數據庫中 如src

二層 從src中抽取每日增量數據 到 hive 的數據庫中 如ods

三層 從ods中抽取數據 到 hive 的數據庫中 如his,即拉鏈表。

基於hive的拉鏈表設計實現