1. 程式人生 > >Kafka的叢集安裝部署

Kafka的叢集安裝部署

架構圖

1)Producer :訊息生產者,就是向kafka broker發訊息的客戶端。

2)Consumer :訊息消費者,向kafka broker取訊息的客戶端

3)Topic :可以理解為一個佇列。

4) Consumer Group (CG):這是kafka用來實現一個topic訊息的廣播(發給所有的consumer)和單播(發給任意一個consumer)的手段。一個topic可以有多個CG。topic的訊息會複製(不是真的複製,是概念上的)到所有的CG,但每個partion只會把訊息發給該CG中的一個consumer。如果需要實現廣播,只要每個consumer有一個獨立的CG就可以了。要實現單播只要所有的consumer在同一個CG。用CG還可以將consumer進行自由的分組而不需要多次傳送訊息到不同的topic。

5)Broker :一臺kafka伺服器就是一個broker。一個叢集由多個broker組成。一個broker可以容納多個topic。

6)Partition:為了實現擴充套件性,一個非常大的topic可以分佈到多個broker(即伺服器)上,一個topic可以分為多個partition,每個partition是一個有序的佇列。partition中的每條訊息都會被分配一個有序的id(offset)。kafka只保證按一個partition中的順序將訊息發給consumer,不保證一個topic的整體(多個partition間)的順序。

7)Offset:kafka的儲存檔案都是按照offset.kafka來命名,用offset做名字的好處是方便查詢。例如你想找位於2049的位置,只要找到2048.kafka的檔案即可。當然the first offset就是00000000000.kafka

修改kafka配置檔案

第一臺機器修改kafka配置檔案server.properties

broker.id=0
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/export/servers/kafka_2.11-1.0.0/logs
num.partitions=2
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=node01:2181,node02:2181,node03:2181
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true
host.name=node01

第二臺 (同一只需改這兩個即可)

broker.id=1

host.name=node02

第三臺同理 

broker.id=2

host.name=node03

啟動kafka叢集 及關閉

進入kafka的目錄下    後臺啟動命令  三臺都要啟動

nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1 &     

關閉叢集

bin/kafka-server-stop.sh stop

建立topic

kafka-topics.sh --create --partitions 3 --replication-factor 2 --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181

檢視 是否建立成功(刪除是delete 加上topic名)

bin/kafka-topics.sh --zookeeper node03:2181 --list

模擬生產者

kafka-console-producer.sh --broker-list node01:9092,node02:9092,node03:9092 --topic kafkatopic

模擬消費者(會接受所有的訊息刪除from-beginning 會接收活著的訊息)

kafka-console-consumer.sh --from-beginning --topic kafkatopic --zookeeper node01:2181,node02:2181,node03:2181

 producer寫入流程

 1)producer先從zookeeper的 "/brokers/.../state"節點找到該partition的leader

2)producer將訊息傳送給該leader

3)leader將訊息寫入本地log

4)followers從leader pull訊息,寫入本地log後向leader傳送ACK

5)leader收到所有ISR中的replication的ACK後,增加HW(high watermark,最後commit 的offset)並向producer傳送ACK

Broker 儲存訊息

物理上把topic分成一個或多個patition(對應 server.properties 中的num.partitions=3配置),每個patition物理上對應一個資料夾(該資料夾儲存該patition的所有訊息和索引檔案)

儲存策略

無論訊息是否被消費,kafka都會保留所有訊息。有兩種策略可以刪除舊資料

1)基於時間:log.retention.hours=168

2)基於大小:log.retention.bytes=1073741824

需要注意的是,因為Kafka讀取特定訊息的時間複雜度為O(1),即與檔案大小無關,所以這裡刪除過期檔案與提高 Kafka 效能無關。

Zookeeper儲存結構

注意:producer不在zk中註冊,消費者在zk中註冊