本文原始碼:[GitHub·點這裡](https://github.com/cicadasmile/data-manage-parent) || [GitEE·點這裡](https://gitee.com/cicadasmile/data-manage-parent) # 一、Kafka叢集環境 ## 1、環境版本 ``` 版本:kafka2.11,zookeeper3.4 ``` **注意**:這裡zookeeper3.4也是基於叢集模式部署。 ## 2、解壓重新命名 ``` tar -zxvf kafka_2.11-0.11.0.0.tgz mv kafka_2.11-0.11.0.0 kafka2.11 ``` 建立日誌目錄 ``` [[email protected] kafka2.11]# mkdir logs ``` **注意**:以上操作需要同步到叢集下其他服務上。 ## 3、新增環境變數 ``` vim /etc/profile export KAFKA_HOME=/opt/kafka2.11 export PATH=$PATH:$KAFKA_HOME/bin source /etc/profile ``` ## 4、修改核心配置 ``` [[email protected] /opt/kafka2.11/config]# vim server.properties -- 核心修改如下 # 唯一編號 broker.id=0 # 開啟topic刪除 delete.topic.enable=true # 日誌地址 log.dirs=/opt/kafka2.11/logs # zk叢集 zookeeper.connect=zk01:2181,zk02:2181,zk03:2181 ``` **注意**:broker.id安裝叢集服務個數編排即可,叢集下不能重複。 ## 5、啟動kafka叢集 ``` # 啟動命令 [[email protected] kafka2.11]# bin/kafka-server-start.sh -daemon config/server.properties # 停止命令 [[email protected] kafka2.11]# bin/kafka-server-stop.sh # 程序檢視 [[email protected] kafka2.11]# jps ``` **注意**:這裡預設啟動了zookeeper叢集服務,並且叢集下的kafka分別啟動。 ## 6、基礎管理命令 **建立topic** ``` bin/kafka-topics.sh --zookeeper zk01:2181 \ --create --replication-factor 3 --partitions 1 --topic one-topic ``` 引數說明: - replication-factor 定義副本個數 - partitions 定義分割槽個數 - topic:定義topic名稱 **檢視topic列表** ``` bin/kafka-topics.sh --zookeeper zk01:2181 --list ``` **修改topic分割槽** ``` bin/kafka-topics.sh --zookeeper zk01:2181 --alter --topic one-topic --partitions 5 ``` **檢視topic** ``` bin/kafka-topics.sh --zookeeper zk01:2181 \ --describe --topic one-topic ``` **傳送訊息** ``` bin/kafka-console-producer.sh \ --broker-list 192.168.72.133:9092 --topic one-topic ``` **消費訊息** ``` bin/kafka-console-consumer.sh \ --bootstrap-server 192.168.72.133:9092 --from-beginning --topic one-topic ``` **刪除topic** ``` bin/kafka-topics.sh --zookeeper zk01:2181 \ --delete --topic first ``` ## 7、Zk叢集用處 Kafka叢集中有一個broker會被選舉為Controller,Controller依賴Zookeeper環境,管理叢集broker的上下線,所有topic的分割槽副本分配和leader選舉等工作。 # 二、訊息攔截案例 ## 1、攔截器簡介 Kafka中介軟體的Producer攔截器主要用於實現訊息傳送的自定義控制邏輯。使用者可以在訊息傳送前以及回撥邏輯執行前有機會對訊息做一些自定義,比如訊息修改等,傳送狀態監控等,使用者可以指定多個攔截器按順序執行攔截。 **核心方法** - configure:獲取配置資訊和初始化資料時呼叫; - onSend:訊息被序列化以及和計算分割槽前呼叫該方法,可以對訊息做操作; - onAcknowledgement:訊息傳送到Broker之後,或傳送過程失敗時呼叫; - close:關閉攔截器呼叫,執行一些資源清理工作; **注意**:這裡說的攔截器是針對訊息傳送流程。 ## 2、自定義攔截 **定義方式**:實現ProducerInterceptor介面即可。 **攔截器一**:在onSend方法中,對攔截的訊息進行修改。 ```java @Component public class SendStartInterceptor implements ProducerInt