1. 程式人生 > >Kafka Connect Details 詳解

Kafka Connect Details 詳解

nec get json格式 err pos print sta 參考 document

目錄

  • 1. Kafka Connect Details 詳解
    • 1.1. 概覽
    • 1.2. 啟動和配置
      • 1.2.1. Standalone 單機模式
      • 1.2.2. Distribute 分布式模式
      • 1.2.3. Connector的配置
    • 1.3. Transformations 轉換器
    • 1.4. REST API
    • 1.5. Kafka Connect 開發詳解
    • 1.6. Kafka Connect VS Producer Consumer
      • 1.6.1. Kafka Connect的優點
    • 1.7. 第三方資源
    • 1.8. 參考

Kafka Connect Details 詳解

原文地址:http://3gods.com/2017/08/18/Kafka-Connect-Details.html。

概覽

Kafka Connect是在0.9以後加入的功能,主要是用來將其他系統的數據導入到Kafka,然後再將Kafka中的數據導出到另外的系統。
可以用來做實時數據同步的ETL,數據實時分析處理等。

主要有2種模式:Standalone(單機模式)和Distribute(分布式模式)。
單機主要用來開發,測試,分布式的用於生產環境。

啟動和配置

Standalone 單機模式

啟動命令: bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties …]
這麽多的配置,搞毛線啊。說真的,官網的這個,當時我真沒看懂。下面解釋下:
執行單機啟動腳本connect-standalone.sh,將connect-standalone.properties屬性文件傳遞進去作為Worker的配置,
另外的配置就是屬於Connector的配置,會被全部傳遞給SouceConnector或者SinkConnector。

bootstrap.servers:kafka集群地址,例如:10.33.2.1:9092,10.33.2.9:9092,10.33.1.13:9092

key.converter:用來轉換寫入或者讀出kafka中消息的key的,例如:org.apache.kafka.connect.json.JsonConverter。
效果是對指定了key是id=1000,轉換成{“id” : 1000},也可以使用Avro的格式。我做的項目使用的maxwell,發現使用Json轉換後,
字段的雙引號全沒了,冒號變成等號,變成了這種鬼東西{id=1000}。後面直接改成使用String的轉換器:org.apache.kafka.connect.storage.StringConverter。

value.converter:同上,只是用來轉換消息的value的,也就是傳輸的具體數據。

offset.storage.file.filename:默認是:/tmp/connect.offsets。
這個配置要註意,單機模式是需要自己持久化offset的。 Kafka Connect會用這裏配置的文件保存offset。
而且針對producer和consumer(也就是source和sink)需要單獨分別配置:
producer.offset.storage.file.filename=/temp/source-offset
consuer.offset.storage.file.filename=/temp/sink-offset
但是,我單機好像從來沒成功過,每次重啟,都是重頭消費。

Distribute 分布式模式

啟動命令:
bin/connect-distributed.sh config/connect-distributed.properties
分布式模式下,connector類及其配置都是通過Rest API接口提交給kafka的。
但不需要配置保存offset的文件,因為分布式下,都是將offsets,configs和status保存到topics中的。
然後由Worker決定如何存儲配置,分配工作,存儲offsets和task的狀態信息。
切記,為了程序的高可用,這3個topics最好手動創建。
具體命令,請看另外一篇博客:Kafka命令 。

group.id:也就是connect-cluster的group id,這個不能和consumer的goup id沖突。
config.storage.topic:用來保存connector和task的配置的。 單分片,多副本,壓實類型(compacted) 的topic。
因為非壓實的topic在一定配置,觸發條件下,會刪除!!!
這裏的多副本是為了配置一直都可用,建議數量等於Kafka Brokers的數量。單分片,應該是剛開始啟動,初始化的時候, 只有一個線程消費。

offset.storage.topic:用來保存offset的,既有source connect的,也有sink connect的offset。多分片,多副本,壓實的topic。
status.storage.topic:用來存儲task狀態的,多分片,多副本,壓實的topic。
這兩個多分片,多副本配置和一般的topic相同就行了,比如我們是3個副本,5個partition。

上面的3個topic,你都可以用console-consumer進行消費看看,特別是status.storage.topic非常有用,
因為分布式模式下,task因為一些配置,異常關掉,只會顯示xxxTask closed,但是不會顯示異常信息。
而從status.storage.topic中消費出來的消息可以看到具體異常信息。

Connector的配置

name:connector的唯一名字
connector.class:用來連接Kafka集群的類名,也就是你繼承SourceConnector或SinkConnector的實現類,
也就是Connector程序的入口,盡量使用全量路徑名。
tasks.max:task的數量,一個task就是一個線程。task數量設置要小於等於分片partition的數量,多了並發度無法提高。
key.converter:覆蓋掉傳遞給Worker的消息的key轉換類,也就是connect-stadalone.properties
和connect-distirbute.properties中key.converter。
value.converter:同上。
topics:要消費的topic列表,對於sink connector才需要配置。

Transformations 轉換器

用來將消息進行修改,轉換,以及路由的。可以將多個組合起來,作為一個轉換鏈。
個人建議是,這些代碼直接在connector中寫,除非可以部署上去,多個系統公用。
還有一方面是,黑盒的類太多,出問題了後不知道是哪裏出問題了,而且也不能本地debug。
再就是著他媽的Transformations配置也太多了,學習成本好高啊。這塊的詳細內容沒看,詳情請看官網。

REST API

這塊就是用來查詢Connector和Task的狀態,主要用於Connector集群的監控。
GET /connectors - 查詢所有connectors
POST /connectors - 提交一個connector。比如是JSON格式,例子:

{
 "name": "dis-maxwell-sink",
 "config": {
   "name" : "maxwell-sink-song",
   "connector.class" : "com.cimc.maxwell.sink.MySqlSinkConnector",
   "tasks.max": 1,
   "topics": "estation.db_ez.t_parcel,estation.db_ez.t_box",
   }
 }

GET /connectors/{name} - 查詢指定connector信息的
GET /connectors/{name}/config - 查詢指定connector配置的
PUT /connectors/{name}/config - 更新指定connector配置的
GET /connectors/{name}/status - 查詢指定connector狀態的
GET /connectors/{name}/tasks - 查詢指定connector的所有tasks
GET /connectors/{name}/tasks/{taskid}/status - 查詢指定connector的指定task的狀態的,taskid一般是0,1,2之類
PUT /connectors/{name}/pause - 暫停指定connector的,慎用,比如因為系統更新升級,想停掉source connector拉取消息
PUT /connectors/{name}/resume - 恢復上面暫停的connector的
POST /connectors/{name}/restart - 重啟一個connector(connector因為一些原因掛掉了,比如被強行殺死,一般不是異常造成)
POST /connectors/{name}/tasks/{taskId}/restart - 重啟一個指定的task的
DELETE /connectors/{name} - 刪除一個connector
GET /connector-plugins - 獲取所有已安裝的connector插件
PUT /connector-plugins/{connector-type}/config/validate - 校驗connector的配置的屬性類型。

Kafka Connect 開發詳解

詳見我的另外一篇博客:Kafka Connect 開發詳解 。

Kafka Connect VS Producer Consumer

其實Kafka Connect的本質就是將Kafka Client包裝了一層,並對開發者提供統一的實現接口。
Source Connector對應Producer,Sink Connector對應Consumer。

Kafka Connect的優點

1.對開發者提供了統一的實現接口
2.開發,部署和管理都非常方便,統一
3.使用分布式模式進行水平擴展,毫無壓力
4.在分布式模式下可以通過Rest Api提交和管理Connectors
5.對offset自動管理,只需要很簡單的配置,而不像Consumer中需要開發者處理
6.流式/批式處理的支持

第三方資源

這是已經得到支持的組件,不需要做額外的開發: https://www.confluent.io/product/connectors/
括號中的Source表示將數據從其他系統導入Kafka,Sink表示將數據從Kafka導出到其他系統。
其他的我沒看,但是JDBC的實現比較的坑爹,是通過primary key(如id)和時間戳(如updateTime)字段,
來判斷數據是否更新,這樣的話應用範圍非常受局限。

參考

Kafka Documentation

Kafka Connect Details 詳解