1. 程式人生 > >在Kettle裡使用時間戳實現變化資料捕獲(CDC)

在Kettle裡使用時間戳實現變化資料捕獲(CDC)

1. 建立測試表,插入資料。
use test;  
  
create table t_color (  
    id int unsigned not null auto_increment primary key,  
    color varchar(10),  
    create_date datetime,  
    last_update timestamp  
)  engine=myisam;  
  
insert into t_color (color,create_date) values('black',now()),('green',now()),('red',now()),('blue',now());  
select * from t_color;  
2. 建立引數表儲存最後一次的抽取時間。
use test;  
  
-- 建立時間戳表  
create table cdc_time (  
last_load datetime,  
current_load datetime) engine=myisam;  
  
-- 初始化資料  
insert into cdc_time values ('1971-01-01 00:00:01','1971-01-01 00:00:01');  
select * from cdc_time;  
3. 建立初始化時間戳轉換


說明:

把current_load時間設定成作業的開始時間。通過“獲取系統資訊”完成這一功能,在這個步驟裡建立一個“系統日期(變)”型別的欄位,欄位名是sysdate。然後建立一個“插入/更新”步驟,把“獲取系統資訊”步驟和“插入/更新”步驟連線起來。在“插入/更新”步驟的“更新欄位”部分裡,用流裡的欄位“sysdate”去更新表裡的欄位“current_load”。另外還要設定“用來查詢的關鍵字”部分,把表的“current_load”的條件設定為“is not null”即可。

4. 建立查詢變化資料的轉換





說明:
從t_color表裡抽取資料的查詢語句使用開始日期和結束日期,左邊閉區間,右邊開區間。查詢條件類似下面的語句:
(create_date >= last_load and create_date < current_load) or (last_update >= last_load and last_update < current_load)

這裡需要兩個表輸入步驟,一個用來從cdc_time表中抽取時間,另一個從t_color表中抽取需要的資料。另外再看查詢條件,可以發現last_load和current_load分別出現兩次。就是說在第一個表輸入步驟中,這些時間值需要被抽取出來兩次。

select   
    last_load last1,  
    current_load cur1,  
    last_load last2,  
    current_load cur2  
from cdc_time;

在t_color表輸入步驟裡,選中“替換 sql 語句裡的變數”,在“從步驟插入資料”下拉列表裡選中上個表輸入步驟。在select語句裡寫入下面的查詢條件:

where (create_date >= ? and create_date < ?) or (last_update >= ? and last_update < ?)
前一個步驟傳來的引數將替換上面語句裡的問號,第一個問號的值是last1,第二個問號的值是cur1,等等。
通過比較create_date和last_update的值是否相等,可以判斷出是新增的還是更改的資料。
case when create_date = last_update then 'new' else 'changed' end as flagfield
把變更資料輸出到文字檔案裡。

5. 建立更新引數表的轉換



說明:

如果轉換中沒有發生任何錯誤,要把current_load欄位裡的值複製到last_load欄位裡。如果轉換中發生了錯誤,時間戳需要保持不變。把current_load欄位裡的值複製到last_load欄位裡需要“執行sql語句”步驟,指令碼如下:

update cdc_time set last_load = current_load;
cdc_time表裡之所以要有兩個欄位,是因為在載入過程中,會有新的資料被插入或更新,為避免髒讀或死鎖的情況,最好給create和update時間戳設定一個上限條件,也就是這裡的current_load欄位。
6. 建立作業

7. 測試
-- 執行作業
-- 檢視diff檔案

-- 檢視cdc_time表

mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load       | current_load      |
+---------------------+---------------------+
| 2014-12-16 11:10:05 | 2014-12-16 11:10:05 |
+---------------------+---------------------+
1 row in set (0.00 sec)

-- 修改資料

delete from t_color where id=3;
update t_color set color='Grey' where id=1;
insert into t_color (color,create_date) values('Yellow',now());
commit;
-- 執行作業
-- 檢視diff檔案

-- 檢視cdc_time表

mysql> select * from cdc_time;
+---------------------+---------------------+
| last_load       | current_load      |
+---------------------+---------------------+
| 2014-12-16 11:16:02 | 2014-12-16 11:16:02 |
+---------------------+---------------------+
1 row in set (0.00 sec)

8. 總結  基於源資料的CDC要求源資料裡有相關的屬性列,ETL過程可以利用這些屬性列,來判斷出哪些資料是增量資料。最常見的屬性列有以下兩種:
  • 時間戳:這種方法至少需要一個更新時間戳,但最好有兩個時間戳:一個插入時間戳,記錄資料行什麼時候建立;一個更新時間戳,記錄資料行什麼時候最後一次更新。
  • 序列:大多數資料庫都有自增序列。如果資料庫表用到了這種序列,就可以很容易識別出新插入的資料。
  這兩種方法都需要一個額外的資料庫表來儲存上一次更新時間或上一次抽取的最後一個序列號。在實踐中,一般是在一個獨立的模式下或在資料緩衝區裡建立這個引數表,不能在資料倉庫裡建立,更不能在資料集市裡建立。基於時間戳和自增序列的方法是CDC最簡單的實現方式,所以也是最常用的方法。但是它的缺點也是很明顯的,主要如下:
  • 區分插入操作和更新操作:只有當源系統包含了插入時間戳和更新時間戳兩個欄位,才能區別插入和更新,否則無法區分。
  • 刪除記錄的操作:不能捕獲到刪除操作,除非是邏輯刪除,即記錄沒有真的刪除,只是做了邏輯上的標誌。
  • 多次更新檢測:如果在一次同步週期內,資料被更新了多次,只能同步最後一次更新操作,中間的更新操作都丟失了。
  • 實時能力:時間戳和基於序列的資料抽取一般適用於批量操作,不適合於實時場景下的資料載入。