1. 程式人生 > >kafka資料同步Elasticsearch深入詳解

kafka資料同步Elasticsearch深入詳解

1、kafka同步到Elasticsearch方式?

目前已知常用的方式有四種:
1)logstash_input_kafka外掛;
缺點:不穩定(ES中文社群討論)
2)spark stream同步;
缺點:太龐大
3)kafka connector同步;
4)自寫程式讀取、解析、寫入
這裡寫圖片描述
本文主要基於kafka connector實現kafka到Elasticsearch全量、增量同步。

2、從confluenct說起

LinkedIn有個三人小組出來創業了—正是當時開發出Apache Kafka實時資訊列隊技術的團隊成員,基於這項技術Jay Kreps帶頭創立了新公司Confluent。Confluent的產品圍繞著Kafka做的。
Confluent Platform簡化了連線資料來源到Kafka,用Kafka構建應用程式,以及安全,監控和管理您的Kafka的基礎設施。
confluent組成如下所示:
這裡寫圖片描述

1)Apache Kafka
訊息分發元件,資料採集後先入Kafka。
2)Schema Registry
Schema管理服務,訊息出入kafka、入hdfs時,給資料做序列化/反序列化處理。
3)Kafka Connect
提供kafka到其他儲存的管道服務,此次焦點是從kafka到hdfs,並建立相關HIVE表。
4)Kafka Rest Proxy
提供kafka的Rest API服務。
5)Kafka Clients
提供Client程式設計所需SDK。

預設埠對應表:

元件 | 埠

Apache Kafka brokers (plain text):9092

Confluent Control Center:9021

Kafka Connect REST API:8083

REST Proxy:8082

Schema Registry REST API:8081

ZooKeeper:2181

3、kafka connector介紹。

Kafka 0.9+增加了一個新的特性 Kafka Connect,可以更方便的建立和管理資料流管道。它為Kafka和其它系統建立規模可擴充套件的、可信賴的流資料提供了一個簡單的模型。

通過 connectors可以將大資料從其它系統匯入到Kafka中,也可以從Kafka中匯出到其它系統。

Kafka Connect可以將完整的資料庫注入到Kafka的Topic中,或者將伺服器的系統監控指標註入到Kafka,然後像正常的Kafka流處理機制一樣進行資料流處理。

而匯出工作則是將資料從Kafka Topic中匯出到其它資料儲存系統、查詢系統或者離線分析系統等,比如資料庫、 Elastic Search、 Apache Ignite等。

KafkaConnect有兩個核心概念:Source和Sink。 Source負責匯入資料到Kafka,Sink負責從Kafka匯出資料,它們都被稱為Connector。

kafkaConnect通過Jest實現Kafka對接Elasticsearch。

4、kafka connector安裝

如下,解壓後既可以使用。

[[email protected]_no1 confluent-3.3.0]# pwd
/home/confluent/confluent-3.3.0

[[email protected]_no1 confluent-3.3.0]# ls -al
total 32
drwxrwxr-x. 7 root root 4096 Dec 16 10:08 .
drwxr-xr-x. 3 root root 4096 Dec 20 15:34 ..
drwxr-xr-x. 3 root root 4096 Jul 28 08:30 bin
drwxr-xr-x. 18 root root 4096 Jul 28 08:30 etc
drwxr-xr-x. 2 root root 4096 Dec 21 15:34 logs
-rw-rw-r--. 1 root root 871 Jul 28 08:45 README
drwxr-xr-x. 10 root root 4096 Jul 28 08:30 share
drwxrwxr-x. 2 root root 4096 Jul 28 08:45 src

5、kafka connector模式

Kafka connect 有兩種工作模式
1)standalone:在standalone模式中,所有的worker都在一個獨立的程序中完成。

2)distributed:distributed模式具有高擴充套件性,以及提供自動容錯機制。你可以使用一個group.ip來啟動很多worker程序,在有效的worker程序中它們會自動的去協調執行connector和task,如果你新加了一個worker或者掛了一個worker,其他的worker會檢測到然後在重新分配connector和task。

6、kafka connector同步步驟

前提:

$ confluent start

如下的服務都需要啟動:

Starting zookeeper
zookeeper is [UP] ——對應埠:2181
Starting kafka
kafka is [UP]——對應埠:9092
Starting schema-registry
schema-registry is [UP]——對應埠:8081
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]

可以,netstat -natpl 檢視埠是否監聽ok。

步驟1:建立topic

./kafka-topics.sh --create --zookeeper 110.118.7.11 :2181 --replication-factor 3 --partitions 1 --topic  test-elasticsearch-sink

步驟2:生產者釋出訊息

假定avrotest topic已經建立。

./kafka-avro-console-producer  --broker-list 110.118.7.11:9092 --topic test-elasticsearch-sink \
         --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}'

{"f1": "value1"}
{"f1": "value2"}
{"f1": "value3"}

步驟3:消費者訂閱訊息測試(驗證生產者訊息可以接收到)

./kafka-avro-console-consumer --bootstrap-server 110.118.7.11:9092 :9092 --topic  test-elasticsearch-sink --from-beginning

步驟4:connector傳輸資料操作到ES

./connect-standalone ../etc/schema-registry/connect-avro-standalone.properties \
../etc/kafka-connect-elasticsearch/quickstart-elasticsearch.properties

注意此處: connect-standalone模式,對應 connect-avro-standalone.properties要修改;
如果使用connect-distribute模式,對應的connect-avro-distribute.properties要修改。
這裡 quickstart-elasticsearch.properties :啟動到目的Elasticsearch配置。

quickstart-elasticsearch.properties**設定**:

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
#kafka主題名稱,也是對應Elasticsearch索引名稱
topics= test-elasticsearch-sink

key.ignore=true
#ES url資訊
connection.url=http://110.18.6.20:9200
#ES type.name固定
type.name=kafka-connect

7、同步效果。

這裡寫圖片描述

curl -XGET 'http:// 110.18.6.20 :9200/test-elasticsearch-sink/_search?pretty'

8、連線資訊查詢REST API

    -
GET /connectors – 返回所有正在執行的connector名。
- POST /connectors – 新建一個connector; 請求體必須是json格式並且需要包含name欄位和config欄位,name是connector的名字,config是json格式,必須包含你的connector的配置資訊。
- GET /connectors/{name} – 獲取指定connetor的資訊。
- GET /connectors/{name}/config – 獲取指定connector的配置資訊。
- PUT /connectors/{name}/config – 更新指定connector的配置資訊。
- GET /connectors/{name}/status – 獲取指定connector的狀態,包括它是否在執行、停止、或者失敗,如果發生錯誤,還會列出錯誤的具體資訊。
- GET /connectors/{name}/tasks – 獲取指定connector正在執行的task。
- GET /connectors/{name}/tasks/{taskid}/status – 獲取指定connector的task的狀態資訊。
- PUT /connectors/{name}/pause – 暫停connector和它的task,停止資料處理知道它被恢復。
- PUT /connectors/{name}/resume – 恢復一個被暫停的connector。
- POST /connectors/{name}/restart – 重啟一個connector,尤其是在一個connector執行失敗的情況下比較常用
- POST /connectors/{name}/tasks/{taskId}/restart – 重啟一個task,一般是因為它執行失敗才這樣做。
- DELETE /connectors/{name} – 刪除一個connector,停止它的所有task並刪除配置。

9、小結。

他山之石,可以攻玉。
kafka上的小學生,繼續加油!

參考:

——————————————————————————————————
更多ES相關實戰乾貨經驗分享,請掃描下方【銘毅天下】微信公眾號二維碼關注。
(每週至少更新一篇!)

這裡寫圖片描述
和你一起,死磕Elasticsearch
——————————————————————————————————

2017.12.21 23:24 於家中床前