1. 程式人生 > >kafka(3) -- 連線匯入、匯出資料

kafka(3) -- 連線匯入、匯出資料

有些場景下Kafka需要使用其他來源的資料或匯出Kafka的資料到其他系統,相對於許多系統需要編寫定製整合的程式碼,使用Kafka連線到系統去匯入或匯出資料更加簡單。

Kafka Connect是包括在Kafka中一個工具,用來匯入匯出資料到Kafka。它是connectors的一個可擴充套件工具,其執行定製邏輯,用於與外部系統互動。本文介紹如何使用Kafka Connect做一些簡單的聯結器從一個檔案匯入資料到Kafka的主題,和將主題資料匯出到另一個檔案,具體操作如下:

1. 建立原始資料檔案

echo -e "foo/bar" > test.txt     #檔名不要修改

test.txt檔案中的內容即要匯入到kafka中的資料。

2. 啟動聯結器
啟動兩個執行在獨立模式的聯結器,聯結器在一個單一的,區域性的,專用的程序中執行。需要提供三個配置檔案作為引數。第一個引數為Kafka連線過程中的公共配置檔案,如要連線到的Kafka的代理伺服器的配置和資料的序列化格式的配置。其餘兩個配置檔案用來建立指定的聯結器。這些檔案包括一個唯一的聯結器名稱,需要例項化的聯結器類,還有建立該聯結器所需的其他配置:

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file
-sink.properties

用這些Kafka的示例配置檔案,使用前面已經啟動的本地群集的預設配置,建立兩個聯結器:第一是一個源聯結器,其從輸入檔案中讀取每行的內容,釋出到的Kafka主題和第二個是一個sink聯結器負責從Kafka主題讀取訊息,生產出的訊息按行輸出到檔案。

3. 檢查資料匯入、匯出
在啟動過程中可以看到聯結器被例項化的資訊,一旦Kafka Connect程序已經起來,源聯結器會從test.txt讀取每行的訊息,並將其生產釋出到主題connect-test,而sink聯結器會從主題connect-test讀取訊息,並將其寫入檔案test.sink.txt。可以通過檢查輸出檔案的內容來驗證資料都已通過整個管道輸送:

#cat test.sink.txt
  foo
  bar

由於資料被儲存在Kafka topic connect-test中,所以我們也可以執行控制檯消費者消費topic中的資料或者自定義消費者邏輯消費訊息:

# bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...

由於聯結器不停的處理資料,因此我們可以繼續將資料新增到test.txt檔案,並能看到資料通過管道移動:

echo 111 >> test.txt
cat test.sink.txt