1. 程式人生 > >kafka connector 中的輕量級ETL-transfomation功能介紹

kafka connector 中的輕量級ETL-transfomation功能介紹

在kafka connector的使用中,可能因為各種原因(業務原因、connector需要key或者schema等)需要用到transfomation,處理訊息的內容。下面列舉了kafka connector 自帶的transfomation的功能,幫助大家瞭解一下,當然有能力也可以自己開發transfomation元件。

功能 轉換器名稱 轉換器型別 獨有配置 對應功能(修改kafka Message)
插入欄位 InsertField org.apache.kafka.connect.transforms.${name}.(key/value) offset.field
新增offset記錄
partition.field 新增對應kafka分割槽記錄
timestamp.field 新增對應時間戳欄位
topic.field 新增對應kafka分割槽欄位記錄
static.field 新增寫死的記錄名
static.field 新增寫死的記錄值
替換欄位 ReplaceField blacklist 要丟棄的欄位名
whitelist 要改名的欄位列表
renames 改名對應對映:old_fied1:new_filed1,old_fied2:new_filed2
遮擋欄位 MaskField fields 要加密(丟棄)的欄位列表
包裝欄位(整個訊息包裝成一個欄位) HoistField field 被包裝後的欄位名
拆包欄位(只能取一個欄位) ExtractField field 要取出來的欄位
設定訊息Schema SetSchemaMetadata schema.name 設定schema名稱
schema.version 設定schema版本
轉換timestamp欄位型別 TimestampConverter target.type
目標timestamp型別(unix/Date/Time/Timestamp)
field timestamp欄位名
format yyyyMMdd
給kafka訊息新增key ValueToKey fields 將值的哪個欄位作為kafka訊息的key
用時間戳來改變目的表或檔名 TimestampRouter timestamp.format 改變訊息的timestamp格式:yyyyMMdd
topic.format ${topic}${timestamp}呼叫topic名和時間戳,生成欄位裡的新topic名
用正則表示式改變目的表或檔名 RegexRouter regex 該topic名要匹配什麼正則:DC1-TEST-(.*)
replacement 要代替的心topic名:$1
展開巢狀的資料介面 Flatten delimiter 將巢狀的結構展開,指定的分隔符
改變欄位的資料型別 Cast spec 轉換欄位型別foo:int8,bar:float32