1. 程式人生 > >消息隊列之kafka(集群搭建)

消息隊列之kafka(集群搭建)

日誌 roc listeners rom watermark connect 信息 href 分區

1.kafka集群搭建

  kafka安裝包下載地址:
 官網網址:http://kafka.apache.org/quickstart
 中文官網:http://kafka.apachecn.org/quickstart.html
 在 windows 平臺,從官網下載:http://mirrors.hust.edu.cn/apache/kafka/1.1.0/
 在 centos 平臺:wgethttp://mirrors.hust.edu.cn/apache/kafka/1.1.0/kafka_2.11-1.1.0.tgz

(1)集群部署的基礎環境準備:

 A: 安裝 JDK 1.8

 A: 安裝 zookeeper 集群(也可以使用自帶 ZooKeeper,但是不推薦)

(2)集體搭建:

  版本:kafka_2.11-1.1.0
 集群規劃:hadoop01、hadoop02、hadoop03 (三個節點)
① 解壓安裝包到對應的目錄
 tar zxvfkafka_2.11-1.1.0.tgz -C /application/
② 修改配置文件
 [hadoop@hadoop01 ~]$ cd /application/kafka_2.11-1.1.0/config/
 [hadoop@hadoop01 ~]$ vim server.properties


  broker.id=5 ## 當前集群中的每個 broker 節點的一個唯一編號,每個節點都不一樣
技術分享圖片

  listeners=PLAINTEXT://:9092
  listeners=PLAINTEXT://hadoop01:9092
  host.name=hadoop01## 每個節點指定為當前主機名,上面也是
技術分享圖片

  log.dirs=/home/hadoop/data/kafka-logs ## kafkabroke工作節點數據存儲目錄
  num.partitions=1
## kafka 的 topic 的默認分區數
技術分享圖片

  log.retention.hours=168 ## 日誌的最長保存時間
技術分享圖片

  zookeeper.connect=hadoop01:2181,hadoop02:2181,hadoop03:2181 ## zookeeper 地址
技術分享圖片
③ 批量發送
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop02:$PWD
[hadoop@hadoop01 application]$scp -r /application/kafka_2.11-1.1.0/ hadoop03:$PWD
千萬註意:要修改$KAFKA_HOME/config/server.properties 文件中的對應 broker 節點的信息

broker.id=your broker id 
host.name=your broker hostname 
advertised.listeners=PLAINTEXT:// your broker hostname:9092

④ 配置環境變量
[hadoop@hadoop01 application]$ sudo etc/profile
export KAFKA_HOME=/application/kafka_2.11-1.1.0
[hadoop@hadoop01 application]$source/etc/profile

⑤ 啟動集群,進行驗證(每一個節點都要啟動)

nohup kafka-server-start.sh /application/kafka_2.11-1.1.0/config/server.properties 1>~/logs/kafka_std.log 2>~/logs/kafka_err.log &

2.kafka相關API

(1)啟動集群每個節點的進程

nohup kafka-server-start.sh  /home/hadoop/apps/kafka_2.11-1.1.0/config/server.properties 1>~/logs/kafka_std.log \ 2>~/logs/kafka_err.log &

(2) 創建 topic

kafka-topics.sh --create --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --replication-factor 1 --partitions 1 --topic kafka_test
**參數介紹**
--create 創建 kafka topic
--zookeeper 指定 kafka 的 zookeeper 地址
--partitions 指定分區的個數
--replication-factor 指定每個分區的副本個數

(3) 查看已經創建的所有 kafka topic

kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --list \

(4) 查看某個指定的 kafka topic 的詳細信息

kafka-topics.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --describe \   #查看詳信息
--topic kafka_test  #指定需要查看的topic

技術分享圖片
Topic:topic的名稱
Partition:topic的分區編號
Leader:負責處理消息和讀寫,leader是從所有節點中隨機選出
Replicas:列出了所有的副本節點,不管節點是否在服務中。
isr:正在服務中的節點。
(5) 開啟生產者模擬生成數據:

kafka-console-producer.sh --broker-list hadoop01:9092 \  # broker的節點列表
--topic kafka_test

(6) 開啟消費者模擬消費數據:

kafka-console-consumer.sh --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --from-beginning \  #從哪裏開始消費
--topic kafka_test

(7) 查看某 topic 某個分區的偏移量最大值和最小值

kafka-run-class.sh kafka.tools.GetOffsetShell --topic kafka_test --time -1 --broker-list hadoop01:9092 --partitions 1

(8) 增加 topic 分區數(這個操作是不被允許的)

kafka-topics.sh --alter --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic kafka_test --partitions 5 /
--replication-factor 2

(9) 刪除 Topic

kafka-topics.sh --delete --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic kafka_test \

消息隊列之kafka(集群搭建)