消息隊列之kafka(集群搭建)
kafka安裝包下載地址:
官網網址:http://kafka.apache.org/quickstart
中文官網:http://kafka.apachecn.org/quickstart.html
在 windows 平臺,從官網下載:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
在 centos 平臺:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz
(1)集群部署的基礎環境準備:
A: 安裝 JDK 1.8
(2)集體搭建:
版本:kafka_2.11-1.1.0
集群規劃:hadoop01、hadoop02、hadoop03 (三個節點)
① 解壓安裝包到對應的目錄
tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
[hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
[hadoop@hadoop01 ~]$ vim server.properties
broker.id=5 ## 當前集群中的每個 broker 節點的一個唯一編號,每個節點都不一樣
listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://hadoop01:9092
host.name=hadoop01## 每個節點指定為當前主機名,上面也是
log.dirs=/home/hadoop/data/kafka-logs ## kafkabroke工作節點數據存儲目錄
num.partitions=1
log.retention.hours=168 ## 日誌的最長保存時間
zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 ## zookeeper 地址
③ 批量發送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千萬註意:要修改$KAFKA_HOME/config/server.properties 文件中的對應 broker 節點的信息
broker.id=your broker id
host.name=your broker hostname
advertised.listeners=PLAINTEXT:// your broker hostname:9092
④ 配置環境變量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile
⑤ 啟動集群,進行驗證(每一個節點都要啟動)
nohup kafka-server-start.sh /application/kafka_2.11-1.1.0/config/server.properties 1>~/logs/kafka_std.log 2>~/logs/kafka_err.log &
2.kafka相關API
(1)啟動集群每個節點的進程
nohup kafka-server-start.sh /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties 1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &
(2) 創建 topic
kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --replication-factor 1 --partitions 1 --topic kafka_test
**參數介紹**
--create 創建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分區的個數
--replication-factor 指定每個分區的副本個數
(3) 查看已經創建的所有 kafka topic
kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list \
(4) 查看某個指定的 kafka topic 的詳細信息
kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --describe \ #查看詳信息
--topic kafka_test #指定需要查看的topic
Topic:topic的名稱
Partition:topic的分區編號
Leader:負責處理消息和讀寫,leader是從所有節點中隨機選出
Replicas:列出了所有的副本節點,不管節點是否在服務中。
isr:正在服務中的節點。
(5) 開啟生產者模擬生成數據:
kafka-console-producer.sh --broker-list hadoop01:9092 \ # broker的節點列表
--topic kafka_test
(6) 開啟消費者模擬消費數據:
kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --from-beginning \ #從哪裏開始消費
--topic kafka_test
(7) 查看某 topic 某個分區的偏移量最大值和最小值
kafka-run-class.sh kafka.tools.GetOffsetShell --topic kafka_test --time -1 --broker-list hadoop01:9092 --partitions 1
(8) 增加 topic 分區數(這個操作是不被允許的)
kafka-topics.sh --alter --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic kafka_test --partitions 5 /
--replication-factor 2
(9) 刪除 Topic
kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic kafka_test \
消息隊列之kafka(集群搭建)