1. 程式人生 > >使用confluent本地安裝和使用kafka

使用confluent本地安裝和使用kafka

轉載請註明出處:使用confluent本地安裝和使用kafka

confluent簡介

confluent是平臺化的工具,封裝了kafka,讓我們可以更方便的安裝和使用監控kafka,作用類似於CDH對於Hadoop。

confluent是由LinkedIn開發出Apache Kafka的團隊成員,基於這項技術創立了新公司Confluent,Confluent的產品也是圍繞著Kafka做的。基本架構如下:

官網
https://www.confluent.io

下載地址
https://www.confluent.io/download/

物理機安裝參考
Confluent Open Source Quick Start (Local)

docker安裝參考
Confluent Open Source Quick Start (Docker)

對比之後感覺比原生的kafka安裝簡單很多,容器是docker容器的版本,對於我們在k8s中使用很方便。

Confluent的元件

Confluent Platform 包括更多的工具和服務,使構建和管理資料流平臺更加容易。
Confluent Control Center(閉源)。管理和監控Kafka最全面的GUI驅動系統。
Confluent Kafka Connectors(開源)。連線SQL資料庫/Hadoop/Hive
Confluent Kafka Clients(開源)。對於其他程式語言,包括C/C++,Python
Confluent Kafka REST Proxy(開源)。允許一些系統通過HTTP和kafka之間傳送和接收訊息。
Confluent Schema Registry(開源)。幫助確定每一個應用使用正確的schema當寫資料或者讀資料到kafka中。

Confluent的安裝

下載地址:
http://www.confluent.io/download
開啟後,顯示最新版本,在右邊填寫資訊後,點選Download下載。

本次我們主要使用REST Proxy,當然底層的broker也是使用confluent的kafka元件,下面簡述安裝步驟:

下載confluent4.0.0

wget http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.tar.gz
tar  xvf   confluent-oss-4.0.0-2.11.tar.gz

解壓到指定目錄下
通過檢視目錄的內容,能夠發現,confluent裡面是含有kafka的,也就是說,如果你沒有安裝kafka,那麼可以通過confluent直接對kafka進行安裝。如果已經安裝了kafka,可以使用confluent提供的外掛。

轉載請註明出處:使用confluent本地安裝和使用kafka

自定義配置

我們可以配置自己需要的和對應配置資訊

進入解壓出來的confluent-4.0.0

cd confluent-4.0.0

配置zookeeper

vi  etc/kafka/zookeeper.properties

內容如下:

dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=0

配置kafka的broker

vi etc/kafka/server.properties

內容如下:

broker.id=50
delete.topic.enable=true
listeners=PLAINTEXT://192.168.11.91:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/var/lib/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.11.91:2181
zookeeper.connection.timeout.ms=6000
confluent.support.metrics.enable=true
confluent.support.customer.id=anonymous

配置rest proxy

vi  etc/kafka-rest/kafka-rest.properties

內容如下:

id=kafka-rest-server
#zookeeper.connect=192.168.11.91:2181
bootstrap.servers=PLAINTEXT://localhost:9092

配置schema registry

vi  etc/schema-registry/schema-registry.properties

內容如下:

listeners=http://0.0.0.0:8081
kafkastore.connection.url=192.168.11.91:2181
kafkastore.topic=_schemas
debug=false

啟動服務

啟動kafka-rest

bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties

上面的這種方式是前臺啟動,也可以以後臺方式啟動。

nohup bin/kafka-rest-start   etc/kafka-rest/kafka-rest.properties &

啟動zookeeper

bin/zookeeper-server-start -daemon etc/kafka/zookeeper.properties 

啟動kafka broker

bin/kafka-server-start -daemon  etc/kafka/server.properties 

啟動schema registry

bin/schema-registry-start -daemon  etc/schema-registry/schema-registry.properties 

測試使用

檢視topics
瀏覽器訪問或者curl都可以

http://192.168.11.91:8082/topics

檢視叢集的brokers

curl http://192.168.11.91:8082/brokers

註冊consumer group

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" -H "Accept: application/vnd.kafka.v2+json"   --data '{"name": "my_consumer_instance", "format": "json", "auto.offset.reset": "earliest"}'   http://localhost:8082/consumers/my_test_consumer

把topic和消費者my_consumer關聯起來

curl -X POST -H "Content-Type: application/vnd.kafka.v2+json" --data '{"topics":["bear"]}'   http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance/subscription

通過rest介面向bear push資料

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json"           --data '{"records":[{"value":{"name": "testUser"}}]}'     "http://localhost:8082/topics/bear"

通過rest介面消費資料

curl -X GET -H "Accept: application/vnd.kafka.json.v2+json"    http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance/records

刪除註冊的consumer例項:

curl -X DELETE -H "Accept: application/vnd.kafka.v2+json"     http://localhost:8082/consumers/my_test_consumer/instances/my_consumer_instance

轉載請註明出處:使用confluent本地安裝和使用kafka

更多資訊參考
https://github.com/confluentinc/kafka-rest