Kafka Connect Details 詳解
目錄
- 1. Kafka Connect Details 詳解
- 1.1. 概覽
- 1.2. 啟動和配置
- 1.2.1. Standalone 單機模式
- 1.2.2. Distribute 分布式模式
- 1.2.3. Connector的配置
- 1.3. Transformations 轉換器
- 1.4. REST API
- 1.5. Kafka Connect 開發詳解
- 1.6. Kafka Connect VS Producer Consumer
- 1.6.1. Kafka Connect的優點
- 1.7. 第三方資源
- 1.8. 參考
Kafka Connect Details 詳解
原文地址:http://3gods.com/2017/08/18/Kafka-Connect-Details.html。
概覽
Kafka Connect是在0.9以後加入的功能,主要是用來將其他系統的數據導入到Kafka,然後再將Kafka中的數據導出到另外的系統。
可以用來做實時數據同步的ETL,數據實時分析處理等。
主要有2種模式:Standalone(單機模式)和Distribute(分布式模式)。
單機主要用來開發,測試,分布式的用於生產環境。
啟動和配置
Standalone 單機模式
啟動命令: bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]
這麽多的配置,搞毛線啊。說真的,官網的這個,當時我真沒看懂。下面解釋下:
執行單機啟動腳本connect-standalone.sh,將connect-standalone.properties屬性文件傳遞進去作為Worker的配置,
另外的配置就是屬於Connector的配置,會被全部傳遞給SouceConnector或者SinkConnector。
bootstrap.servers:kafka集群地址,例如:10.33.2.1:9092,10.33.2.9:9092,10.33.1.13:9092
key.converter:用來轉換寫入或者讀出kafka中消息的key的,例如:org.apache.kafka.connect.json.JsonConverter。
效果是對指定了key是id=1000,轉換成{“id” : 1000},也可以使用Avro的格式。我做的項目使用的maxwell,發現使用Json轉換後,
字段的雙引號全沒了,冒號變成等號,變成了這種鬼東西{id=1000}。後面直接改成使用String的轉換器:org.apache.kafka.connect.storage.StringConverter。
value.converter:同上,只是用來轉換消息的value的,也就是傳輸的具體數據。
offset.storage.file.filename:默認是:/tmp/connect.offsets。
這個配置要註意,單機模式是需要自己持久化offset的。 Kafka Connect會用這裏配置的文件保存offset。
而且針對producer和consumer(也就是source和sink)需要單獨分別配置:
producer.offset.storage.file.filename=/temp/source-offset
consuer.offset.storage.file.filename=/temp/sink-offset
但是,我單機好像從來沒成功過,每次重啟,都是重頭消費。
Distribute 分布式模式
啟動命令:
bin/connect-distributed.sh config/connect-distributed.properties
分布式模式下,connector類及其配置都是通過Rest API接口提交給kafka的。
但不需要配置保存offset的文件,因為分布式下,都是將offsets,configs和status保存到topics中的。
然後由Worker決定如何存儲配置,分配工作,存儲offsets和task的狀態信息。
切記,為了程序的高可用,這3個topics最好手動創建。
具體命令,請看另外一篇博客:Kafka命令 。
group.id:也就是connect-cluster的group id,這個不能和consumer的goup id沖突。
config.storage.topic:用來保存connector和task的配置的。 單分片,多副本,壓實類型(compacted) 的topic。
因為非壓實的topic在一定配置,觸發條件下,會刪除!!!
這裏的多副本是為了配置一直都可用,建議數量等於Kafka Brokers的數量。單分片,應該是剛開始啟動,初始化的時候, 只有一個線程消費。
offset.storage.topic:用來保存offset的,既有source connect的,也有sink connect的offset。多分片,多副本,壓實的topic。
status.storage.topic:用來存儲task狀態的,多分片,多副本,壓實的topic。
這兩個多分片,多副本配置和一般的topic相同就行了,比如我們是3個副本,5個partition。
上面的3個topic,你都可以用console-consumer進行消費看看,特別是status.storage.topic非常有用,
因為分布式模式下,task因為一些配置,異常關掉,只會顯示xxxTask closed,但是不會顯示異常信息。
而從status.storage.topic中消費出來的消息可以看到具體異常信息。
Connector的配置
name:connector的唯一名字
connector.class:用來連接Kafka集群的類名,也就是你繼承SourceConnector或SinkConnector的實現類,
也就是Connector程序的入口,盡量使用全量路徑名。
tasks.max:task的數量,一個task就是一個線程。task數量設置要小於等於分片partition的數量,多了並發度無法提高。
key.converter:覆蓋掉傳遞給Worker的消息的key轉換類,也就是connect-stadalone.properties
和connect-distirbute.properties中key.converter。
value.converter:同上。
topics:要消費的topic列表,對於sink connector才需要配置。
Transformations 轉換器
用來將消息進行修改,轉換,以及路由的。可以將多個組合起來,作為一個轉換鏈。
個人建議是,這些代碼直接在connector中寫,除非可以部署上去,多個系統公用。
還有一方面是,黑盒的類太多,出問題了後不知道是哪裏出問題了,而且也不能本地debug。
再就是著他媽的Transformations配置也太多了,學習成本好高啊。這塊的詳細內容沒看,詳情請看官網。
REST API
這塊就是用來查詢Connector和Task的狀態,主要用於Connector集群的監控。
GET /connectors - 查詢所有connectors
POST /connectors - 提交一個connector。比如是JSON格式,例子:
{
"name": "dis-maxwell-sink",
"config": {
"name" : "maxwell-sink-song",
"connector.class" : "com.cimc.maxwell.sink.MySqlSinkConnector",
"tasks.max": 1,
"topics": "estation.db_ez.t_parcel,estation.db_ez.t_box",
}
}
GET /connectors/{name} - 查詢指定connector信息的
GET /connectors/{name}/config - 查詢指定connector配置的
PUT /connectors/{name}/config - 更新指定connector配置的
GET /connectors/{name}/status - 查詢指定connector狀態的
GET /connectors/{name}/tasks - 查詢指定connector的所有tasks
GET /connectors/{name}/tasks/{taskid}/status - 查詢指定connector的指定task的狀態的,taskid一般是0,1,2之類
PUT /connectors/{name}/pause - 暫停指定connector的,慎用,比如因為系統更新升級,想停掉source connector拉取消息
PUT /connectors/{name}/resume - 恢復上面暫停的connector的
POST /connectors/{name}/restart - 重啟一個connector(connector因為一些原因掛掉了,比如被強行殺死,一般不是異常造成)
POST /connectors/{name}/tasks/{taskId}/restart - 重啟一個指定的task的
DELETE /connectors/{name} - 刪除一個connector
GET /connector-plugins - 獲取所有已安裝的connector插件
PUT /connector-plugins/{connector-type}/config/validate - 校驗connector的配置的屬性類型。
Kafka Connect 開發詳解
詳見我的另外一篇博客:Kafka Connect 開發詳解 。
Kafka Connect VS Producer Consumer
其實Kafka Connect的本質就是將Kafka Client包裝了一層,並對開發者提供統一的實現接口。
Source Connector對應Producer,Sink Connector對應Consumer。
Kafka Connect的優點
1.對開發者提供了統一的實現接口
2.開發,部署和管理都非常方便,統一
3.使用分布式模式進行水平擴展,毫無壓力
4.在分布式模式下可以通過Rest Api提交和管理Connectors
5.對offset自動管理,只需要很簡單的配置,而不像Consumer中需要開發者處理
6.流式/批式處理的支持
第三方資源
這是已經得到支持的組件,不需要做額外的開發: https://www.confluent.io/product/connectors/
括號中的Source表示將數據從其他系統導入Kafka,Sink表示將數據從Kafka導出到其他系統。
其他的我沒看,但是JDBC的實現比較的坑爹,是通過primary key(如id)和時間戳(如updateTime)字段,
來判斷數據是否更新,這樣的話應用範圍非常受局限。
參考
Kafka Documentation
Kafka Connect Details 詳解