1. 程式人生 > >Kafka安裝及部署

Kafka安裝及部署

cto ane 交互 隊列 apache cleanup .html netstat 介紹

閱讀目錄

  • 一、環境配置
  • 二、操作過程
  • Kafka介紹

安裝及部署

回到頂部

一、環境配置

  • 操作系統:Cent OS 7

  • Kafka版本:0.9.0.0

  • Kafka官網下載:請點擊

  • JDK版本:1.7.0_51

  • SSH Secure Shell版本:XShell 5

回到頂部

二、操作過程

1、下載Kafka並解壓

  • 下載:

curl -L -O http://mirrors.cnnic.cn/apache/kafka/0.9.0.0/kafka_2.10-0.9.0.0.tgz 
  • 解壓:

tar zxvf kafka_2.10-0.9.0.0.tgz 

2、Kafka目錄介紹

  • /bin 操作kafka的可執行腳本,還包含windows下腳本

  • /config 配置文件所在目錄

  • /libs 依賴庫目錄

  • /logs 日誌數據目錄,目錄kafka把server端日誌分為5種類型,分為:server,request,state,log-cleaner,controller

3、配置

  • 配置zookeeper

請參考zookeeper

  • 進入kafka安裝工程根目錄編輯config/server.properties

kafka最為重要三個配置依次為:broker.id、log.dir、zookeeper.connect,kafka server端config/server.properties參數說明和解釋如下:

server.properties配置屬性說明

4、啟動Kafka

  • 啟動

進入kafka目錄,敲入命令 bin/kafka-server-start.sh config/server.properties &
  • 檢測2181與9092端口

netstat -tunlp|egrep "(2181|9092)"
tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
tcp        0      0 :::9092                     :::*                        LISTEN      28094/java 

說明:

Kafka的進程ID為28094,占用端口為9092

QuorumPeerMain為對應的zookeeper實例,進程ID為19787,在2181端口監聽

5、單機連通性測試

啟動2個XSHELL客戶端,一個用於生產者發送消息,一個用於消費者接受消息。

  • 運行producer,隨機敲入幾個字符,相當於把這個敲入的字符消息發送給隊列。

bin/kafka-console-producer.sh --broker-list 192.168.1.181:9092 --topic test

說明:早版本的Kafka,–broker-list 192.168.1.181:9092需改為–zookeeper 192.168.1.181:2181

  • 運行consumer,可以看到剛才發送的消息列表。

bin/kafka-console-consumer.sh --zookeeper 192.168.1.181:2181 --topic test --from-beginning  
  • 註意:

producer,指定的Socket(192.168.1.181+9092),說明生產者的消息要發往kafka,也即是broker

consumer, 指定的Socket(192.168.1.181+2181),說明消費者的消息來自zookeeper(協調轉發)

上面的只是一個單個的broker,下面我們來實驗一個多broker的集群。

6、搭建一個多個broker的偽集群

剛才只是啟動了單個broker,現在啟動有3個broker組成的集群,這些broker節點也都是在本機上。

(1)為每一個broker提供配置文件

我們先看看config/server0.properties配置信息:

技術分享圖片
broker.id=0
listeners=PLAINTEXT://:9092
port=9092
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
技術分享圖片
  • 說明:

broker.id為集群中唯一的標註一個節點,因為在同一個機器上,所以必須指定不同的端口和日誌文件,避免數據被覆蓋。

在上面單個broker的實驗中,為什麽kafka的端口為9092,這裏可以看得很清楚。

kafka cluster怎麽同zookeeper交互的,配置信息中也有體現。

那麽下面,我們仿照上面的配置文件,提供2個broker的配置文件:

  • server1.properties:

按 Ctrl+C 復制代碼 按 Ctrl+C 復制代碼
  • server2.properties:

技術分享圖片
broker.id=2
listeners=PLAINTEXT://:9094
port=9094
host.name=192.168.1.181
num.network.threads=4
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/tmp/kafka-logs2
num.partitions=5
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.1.181:2181
zookeeper.connection.timeout.ms=6000
queued.max.requests =500
log.cleanup.policy = delete
技術分享圖片
(2)啟動所有的broker

命令如下:

bin/kafka-server-start.sh config/server0.properties &   #啟動broker0
bin/kafka-server-start.sh config/server1.properties & #啟動broker1
bin/kafka-server-start.sh config/server2.properties & #啟動broker2

查看2181、9092、9093、9094端口

netstat -tunlp|egrep "(2181|9092|9093|9094)"
tcp        0      0 :::9093                     :::*                        LISTEN      29725/java          
tcp        0      0 :::2181                     :::*                        LISTEN      19787/java          
tcp        0      0 :::9094                     :::*                        LISTEN      29800/java          
tcp        0      0 :::9092                     :::*                        LISTEN      29572/java  

一個zookeeper在2181端口上監聽,3個kafka cluster(broker)分別在端口9092,9093,9094監聽。

(3)創建topic
bin/kafka-topics.sh --create --topic topic_1 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_2 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181
bin/kafka-topics.sh --create --topic topic_3 --partitions 1 --replication-factor 3  \--zookeeper localhost:2181

查看topic創建情況:

bin/kafka-topics.sh --list --zookeeper localhost:2181
test
topic_1
topic_2
topic_3
[root@atman081 kafka_2.10-0.9.0.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 0	Replicas: 0,2,1	Isr: 0,2,1

上面的有些東西,也許還不太清楚,暫放,繼續試驗。需要註意的是topic_1的Leader=2

(4)模擬客戶端發送,接受消息
  • 發送消息

bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
  • 接收消息

bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning

需要註意,此時producer將topic發布到了3個broker中,現在就有點分布式的概念了。

(5) kill some broker

kill broker(id=0)

首先,我們根據前面的配置,得到broker(id=0)應該在9092監聽,這樣就能確定它的PID了。

broker0沒kill之前topic在kafka cluster中的情況

bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1,0
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1,0

kill之後,再觀察,做下對比。很明顯,主要變化在於Isr,以後再分析

bin/kafka-topics.sh --describe --zookeeper localhost:2181
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: -1	Replicas: 0	Isr: 
Topic:topic_1	PartitionCount:1	ReplicationFactor:3	Configs:

	Topic: topic_1	Partition: 0	Leader: 2	Replicas: 2,1,0	Isr: 2,1
Topic:topic_2	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_2	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2
Topic:topic_3	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: topic_3	Partition: 0	Leader: 2	Replicas: 0,2,1	Isr: 2,1

測試下,發送消息,接受消息,是否收到影響。

  • 發送消息

bin/kafka-console-producer.sh --topic topic_1 --broker-list 192.168.1.181:9092,192.168.1.181:9093,192.168.1.181:9094
  • 接收消息

bin/kafka-console-consumer.sh --topic topic_1 --zookeeper 192.168.1.181:2181 --from-beginning

可見,kafka的分布式機制,容錯能力還是挺好的~

回到頂部

Kafka介紹

1、kafka有什麽?

  • producer 消息的生成者,即發布消息

  • consumer 消息的消費者,即訂閱消息

  • broker Kafka以集群的方式運行,可以由一個或多個服務組成,服務即broker

  • zookeeper 協調轉發

2、kafka的工作圖

技術分享圖片

producers通過網絡將消息發送到Kafka集群,集群向消費者提供消息

kafka對消息進行歸納,即topic,也就是說producer發布topic,consumer訂閱topic

參考資料

apache kafka技術分享系列(目錄索引)

Kafka深度解析,眾人推薦,精彩好文!

Kafka安裝及部署