1. 程式人生 > >2.Kafka_Zookeeper叢集搭建

2.Kafka_Zookeeper叢集搭建

在同一臺機器上搭建zk叢集和kfaka叢集。

一.Zookeeper叢集搭建

1.建立zk安裝目錄

[[email protected] ~]# mkdir -p /opt/server/zookeeper/server1/data
[[email protected] ~]# mkdir -p /opt/server/zookeeper/server1/dataLog
[[email protected] ~]# mkdir -p /opt/server/zookeeper/server2/data
[[email protected] ~]# mkdir -p /opt/server/zookeeper/server2/dataLog
[
[email protected]
~]# mkdir -p /opt/server/zookeeper/server3/data [[email protected] ~]# mkdir -p /opt/server/zookeeper/server3/dataLog

2.下載zk-3.3.6,並解壓到上述三個目錄中

#下載
[[email protected] ~]# wget http://mirrors.cnnic.cn/apache/zookeeper/zookeeper-3.3.6/zookeeper-3.3.6.tar.gz
#解壓
[[email protected]
~]# tar -zxvf  zookeeper-3.3.6.tar.gz -C /opt/server/zookeeper/server1 [[email protected] ~]# tar -zxvf  zookeeper-3.3.6.tar.gz -C /opt/server/zookeeper/server2 [[email protected] ~]# tar -zxvf  zookeeper-3.3.6.tar.gz -C /opt/server/zookeeper/server3

3.修改三個zk配置檔案

[[email protected]
~]# cd /opt/server/zookeeper/server1/zookeeper-3.3.6/conf [[email protected] conf]# cp -rf  zoo_sample.cfg   zoo.cfg [[email protected] conf]# vim  zoo.cfg     tickTime=2000     initLimit=10     syncLimit=5     clientPort=2181 #2182,2183     dataDir=/opt/server/zookeeper/server1/data #server2和server3依次修改目錄即可     dataLogDir=/opt/server/zookeeper/server1/dataLog #server2和server3依次修改目錄即可     server.1=127.0.0.1:2888:3888     server.2=127.0.0.1:2889:3889     server.3=127.0.0.1:2890:3890 #建立zk的id編號 [[email protected] conf]# echo "1" > /opt/server/zookeeper/server1/data/myid [[email protected] conf]# echo "2" > /opt/server/zookeeper/server2/data/myid [[email protected] conf]# echo "3" > /opt/server/zookeeper/server3/data/myid

4.依次開啟三個zk

[[email protected] conf]# cd /opt/server/zookeeper/server1/zookeeper-3.3.6/
[[email protected] zookeeper-3.3.6]# bin/zkServer.sh start
[[email protected] zookeeper-3.3.6]# cd /opt/server/zookeeper/server2/zookeeper-3.3.6/
[[email protected] zookeeper-3.3.6]# bin/zkServer.sh start
[[email protected] zookeeper-3.3.6]# cd /opt/server/zookeeper/server3/zookeeper-3.3.6/
[[email protected] zookeeper-3.3.6]# bin/zkServer.sh start

5.客戶端登入驗證

[[email protected] zookeeper-3.3.6]# bin/zkCli.sh -server master:2181

二.Kafka叢集搭建

1.建立kafka安裝目錄

[[email protected] ~]# mkdir -p  /opt/server/kafka/kafka1/kafkaLog
[[email protected] ~]# mkdir -p  /opt/server/kafka/kafka2/kafkaLog
[[email protected] ~]# mkdir -p  /opt/server/kafka/kafka3/kafkaLog

2.下載zk-3.3.6,並解壓到上述三個目錄中

[[email protected] ~]# wget http://apache.fayea.com/kafka/0.10.2.2/kafka_2.11-0.10.2.2.tgz
[[email protected] ~]# tar -zxvf kafka_2.11-0.10.2.2 -C /opt/server/kafka/kafka1
[[email protected] ~]# vi /opt/server/kafka/kafka1/kafka_2.11-0.10.2.2/config/server.properties
broker.id=0 #broker.id=1、broker.id=2
log.dirs =/opt/server/kafka/kafka1/kafkaLog
listeners=PLAINTEXT://master:9092 #port=9093、port=9094
port=9092 #port=9093、port=9094
zookeeper.connect=master:2181,master:2182,master:2183
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880
[[email protected] ~]# vi /opt/server/kafka/kafka1/kafka_2.11-0.10.2.2/config/consumer.properties
zookeeper.connect=master:2181,master:2182,master:2183
group.id=logGroup

同樣解壓到kafka2、kafka3中,更改配置。唯一跟kafka1的區別就是broker.id和port。
3.開啟服務

[[email protected] ~]# cd /opt/server/kafka/kafka1/kafka_2.11-0.10.2.2/bin
[[email protected] ~]# ./kafka-server-start.sh  ../config/server.properties  &
[[email protected] ~]# cd /opt/server/kafka/kafka2/kafka_2.11-0.10.2.2/bin
[[email protected] ~]# ./kafka-server-start.sh  ../config/server.properties  &
[[email protected] ~]# cd /opt/server/kafka/kafka3/kafka_2.11-0.10.2.2/bin
[[email protected] ~]# ./kafka-server-start.sh  ../config/server.properties  &

三.Kafka基本應用

1.建立主題

讓我們建立一個名為“test”的主題,它只包含一個分割槽,只有一個副本:

1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

如果我們執行list topic命令,我們現在可以看到該主題:

1

2

> bin/kafka-topics.sh --list --zookeeper localhost:2181

test

或者,您也可以將代理配置為在釋出不存在的主題時自動建立主題,而不是手動建立主題。

2.傳送一些訊息

Kafka附帶一個命令列客戶端,它將從檔案或標準輸入中獲取輸入,並將其作為訊息傳送到Kafka叢集。預設情況下,每行將作為單獨的訊息傳送。

執行生產者,然後在控制檯中鍵入一些訊息以傳送到伺服器。

1

2

3

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

This is a message

This is another message

3.啟動消費者

1

2

3

> bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

This is a message

This is another message

4.設定多代理叢集(具體配置如上方:二.kafka配置)

現在建立一個複製因子為3的新topic:

1

> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

現在我們有一個叢集,我們怎麼知道哪個Agent正在做什麼?要檢視執行“describe topics”命令:

1

2

3

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

    Topic: my-replicated-topic  Partition: 0    Leader: 1   Replicas: 1,2,0 Isr: 1,2,0

這是輸出的解釋。第一行給出了所有分割槽的摘要,每個附加行提供有關一個分割槽的資訊。由於我們只有一個分割槽用於此topic,因此只有一行。

  • “leader”是負責給定分割槽的所有讀取和寫入的節點。每個節點將成為隨機選擇的分割槽部分的領導者。
  • “replicas”是複製此分割槽日誌的節點列表,無論它們是否為領導者,或者即使它們當前處於活動狀態。
  • “isr”是“同步”複製品的集合。這是副本列表的子集,該列表當前處於活躍狀態並且已經被領導者捕獲。

請注意示例中,節點1是該topic的唯一分割槽的領導者。

5.測試容錯性

broker1充當leader,所以讓我們kill它:

1

2

3

> ps aux | grep server-1.properties

7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.8/Home/bin/java...

> kill -9 7564

leader已切換到其中一個從屬節點,節點1不再位於同步副本集中:

1

2

3

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic

Topic:my-replicated-topic   PartitionCount:1    ReplicationFactor:3 Configs:

    Topic: my-replicated-topic  Partition: 0    Leader: 2   Replicas: 1,2,0 Isr: 2,0

但即使最初接受寫入的leader已經失敗,這些訊息仍可供消費。