1. 程式人生 > >偽分散式kafka安裝與驗證

偽分散式kafka安裝與驗證

基本資訊

安裝環境:Centos7  
# java -version
openjdk version "1.8.0_102"
OpenJDK Runtime Environment (build 1.8.0_102-b14)
OpenJDK 64-Bit Server VM (build 25.102-b14, mixed mode)

版本資訊:
kafka_2.11-2.0.0  zookeeper-3.4.12
安裝組網:偽分散式的三節點zookeeper + 偽分散式的三節點kafka

主機名/etc/hosts
192.168.64.210   zk1
192.168.64.210   zk2
192.168.64.210   zk3
192.168.64.210   kafka1
192.168.64.210   kafka2
192.168.64.210   kafka3

安裝偽分散式ZK

1、上傳安裝包zookeeper-3.4.12.jar到/opt目錄,解壓後修改為目錄名為zk即zookeeper的家目錄為/opt/zk 2、修改配置檔案

/opt/zk/conf/zoo1.cfg
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/mnt/zookeeper/zk1
dataLogDir=/mnt/zklog/zk1
# the port at which the clients will connect
clientPort=2181

server.1=zk1:2888:3888
server.2=zk2:4888:5888
server.3=zk3:6888:7888
/opt/zk/conf/zoo2.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/mnt/zookeeper/zk2
# the port at which the clients will connect
dataLogDir=/mnt/zklog/zk2
clientPort=2182
server.1=zk1:2888:3888
server.2=zk2:4888:5888
server.3=zk3:6888:7888
/opt/zk/conf/zoo3.cfg

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=/mnt/zookeeper/zk3
# the port at which the clients will connect
dataLogDir=/mnt/zklog/zk3
clientPort=2183
server.1=zk1:2888:3888
server.2=zk2:4888:5888
server.3=zk3:6888:7888

3、為每個例項建立相應的dataDir和dataLogDir目錄

mkdir -p /mnt/zookeeper/zk1
mkdir -p /mnt/zklog/zk1
mkdir -p /mnt/zookeeper/zk2
mkdir -p /mnt/zklog/zk2
mkdir -p /mnt/zookeeper/zk3
mkdir -p /mnt/zklog/zk3

4、建立myid檔案

echo “1” > /mnt/zookeeper/zk1/myid
echo “2” > /mnt/zookeeper/zk2/myid
echo “3” > /mnt/zookeeper/zk3/myid

5、啟動zookeeper

cd /mnt/zk
bin/zkServer.sh start conf/zool.cfg
bin/zkServer.sh start conf/zoo2.cfg
bin/zkServer.sh start conf/zoo3.cfg

6、檢視狀態

cd /mnt/zk
bin/zkServer.sh status conf/zool.cfg
bin/zkServer.sh status conf/zoo2.cfg
bin/zkServer.sh status conf/zoo3.cfg

安裝偽分散式kafka

1、上傳安裝包kafka_2.11-2.0.0.tgz到/opt目錄,解壓後修改為目錄名為kafka即kafka的家目錄為/opt/kafka 2、修改配置檔案

/opt/kafka/config/server1.properties,修改的引數如下所示,其它引數保留預設值
broker.id=0

listeners = PLAINTEXT://kafka1:9092

log.dirs=/mnt/kafka/kafka1-logs

zookeeper.connect=zk1:2181,zk2:2182,zk3:2183
/opt/kafka/config/server2.properties,修改的引數如下所示,其它引數保留預設值
broker.id=1

listeners = PLAINTEXT://kafka1:9093

log.dirs=/mnt/kafka/kafka2-logs

zookeeper.connect=zk1:2181,zk2:2182,zk3:2183
/opt/kafka/config/server3.properties,修改的引數如下所示,其它引數保留預設值
broker.id=2

listeners = PLAINTEXT://kafka1:9094

log.dirs=/mnt/kafka/kafka3-logs

zookeeper.connect=zk1:2181,zk2:2182,zk3:2183

3、啟動kafka伺服器

cd /opt/kafka
bin/kafka-server-start.sh -daemon config/serverl.properties
bin/kafka-server-start.sh -daemon config/server2.properties
bin/kafka-server-start.sh -daemon config/server3.properties

4、建立Topic: test-topic 

bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2182,zk3:2183 --create --topic test-topic --partitions 3 --replication-factor 3

5、檢視建立的Topic

#bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2182,zk3:2183 --list
test-topic
#bin/kafka-topics.sh --zookeeper zk1:2181,zk2:2182,zk3:2183 --describe --topic test-topic
Topic:test-topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: test-topic	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,2,1
	Topic: test-topic	Partition: 1	Leader: 2	Replicas: 1,2,0	Isr: 2,0,1
	Topic: test-topic	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1

6、測試消費傳送與接收

bin/kafka-console-producer.sh --broker-list kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic

bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092,kafka2:9093,kafka3:9094 --topic test-topic --from-beginning

7、生產者吞吐量測試

bin/kafka-producer-perf-test.sh --topic test-topic  --num-records 500000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka2:9093,kafka3:9094 acks=-1

[[email protected] kafka]# bin/kafka-producer-perf-test.sh --topic test-topic  --num-records 200000 --record-size 200 --throughput -1 --producer-props bootstrap.servers=kafka1:9092,kafka2:9093,kafka3:9094 acks=-1
41556 records sent, 8291.3 records/sec (1.58 MB/sec), 1434.1 ms avg latency, 4115.0 max latency.
29484 records sent, 5845.4 records/sec (1.11 MB/sec), 7867.2 ms avg latency, 11438.0 max latency.
30724 records sent, 6106.9 records/sec (1.16 MB/sec), 12741.4 ms avg latency, 15933.0 max latency.
33767 records sent, 6672.0 records/sec (1.27 MB/sec), 17947.6 ms avg latency, 20752.0 max latency.
33241 records sent, 6588.9 records/sec (1.26 MB/sec), 22624.3 ms avg latency, 25462.0 max latency.
200000 records sent, 5890.842685 records/sec (1.12 MB/sec), 14432.05 ms avg latency, 27844.00 ms max latency, 15604 ms 50th, 27200 ms 95th, 27536 ms 99th, 27806 ms 99.9th.

7、消費者吞吐量測試

bin/kafka-consumer-perf-test.sh  --broker-list kafka1:9092,kafka2:9093,kafka3:9094   --messages 200000 --topic test-topic

[email protected] kafka]# bin/kafka-consumer-perf-test.sh  --broker-list kafka1:9092,kafka2:9093,kafka3:9094   --messages 200000 --topic test-topic
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2018-11-13 03:19:04:046, 2018-11-13 03:19:06:103, 38.1507, 18.5467, 200026, 97241.6140, 109, 1948, 19.5845, 102682.7515