1. 程式人生 > >Kafka集群部署及測試

Kafka集群部署及測試

str conn 單位 大數據 list compress baseline 類型 消費者

題記

眼下我們對大數據進行研究方向以Spark為主,當中Spark Streaming是能夠接收動態數據流並進行處理。那麽Spark Streaming支持多源的數據發送端,比如TCP、ZeroMQ、自然也包含Kafka,並且Kafka+SparkStreaming的技術融合也比較經常使用並且成熟,所以我們須要搭建一個Kafka集群進行流數據的測試。

--------------------------------------------------------------------------------------

Blog: http://blog.csdn.net/chinagissoft

QQ群:

idkey=db34317167632c01ab4750de87c000ae63cf173cf6fcbbd724ae60213272da91">16403743

宗旨:專註於"GIS+"前沿技術的研究與交流,將雲計算技術、大數據技術、容器技術、物聯網與GIS進行深度融合,探討"GIS+"技術和行業解決方式

轉載說明:文章同意轉載。但必須以鏈接方式註明源地址。否則追究法律責任!

--------------------------------------------------------------------------------------


環境介紹

眼下我們的環境還是原有的Hadoop集群和Spark集群。

三臺集群,一臺主節點。兩臺子節點。

  • 192.168.12.210 master
  • 192.168.12.211 slave1
  • 192.168.12.212 slave2

相同,我們的目標在這三臺機器部署Kafka集群,另外同一時候包含Zookeeper集群。


眼下安裝的kafka版本號為:kafka_2.10-0.8.2.1(當中2.10是支持的Scala版本號,0.8.2.1是kafka版本號)

下載地址: https://kafka.apache.org/downloads.html


安裝步驟

眼下對於開源軟件的部署,特別是在Linux環境下,基本上為綠色安裝,也就是僅僅須要解壓縮軟件包就可以。

1、在210集群進行操作

解壓縮kafka軟件包

tar –xvf kafka_2.10-0.8.2.1

2、創建zookeeper的數據文件夾並設定server編號

mkdir zk_dir
然後再zk_dir文件夾下創建一個myid文件,編輯該文件,設置一個server唯一編號就可以。比如在210機器上。myid文件僅僅須要輸入1就可以。


3、改動%KAFKA_HOME%/config裏面的配置文件,編輯 config/zookeeper.properties 文件,添加下面配置:

dataDir=/home/supermap/zk_dir/
clientPort=2181
initLimit=5
syncLimit=2
server.1=192.168.12.210:2888:3888
server.2=192.168.12.211:2888:3888
server.3=192.168.12.212:2888:3888

  • tickTime:zookeeper server之間的心跳時間間隔。以毫秒為單位。
  • dataDir:zookeeper 的數據保存文件夾。我們也把 zookeeper server的 ID 文件保存到這個文件夾下
  • clientPort:zookeeper server會監聽這個port。然後等待客戶端連接。

  • initLimit:zookeeper 集群中 follower server和 leader server之間建立初始連接時所能容忍的心跳次數的極限值。

  • syncLimit:zookeeper 集群中 follower server和 leader server之間請求和應答過程中所能容忍的心跳次數的極限值。
  • server.N:N 代表的是 zookeeper 集群server的編號。

    對於配置值,以 192.168.12.210:2888:3888 為例。192.168.12.210 表示該server的 IP 地址,2888 port表示該server與 leader server的數據交換port,3888 表示選舉新的 leader server時候用到的通信port。

4、改動%KAFKA_HOME%/config裏面的配置文件


a. 編輯 config/server.properties 文件

[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] server.properties 
broker.id=1
port=9092
host.name=192.168.12.210
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/home/supermap/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.cleaner.enable=false
zookeeper.connect=192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
zookeeper.connection.timeout.ms=6000

  • broker.id:Kafka broker 的唯一標識。集群中不能反復。

  • port: Broker 的監聽端口,用於監聽 Producer 或者 Consumer 的連接。

  • host.name:當前 Broker server的 IP 地址或者機器名。
  • zookeeper.contact:Broker 作為 zookeeper 的 client,能夠連接的 zookeeper 的地址信息。
  • log.dirs:日誌保存文件夾。

註意:改動zookeeper.connect為自己集群的IP及port信息,HostName輸入本機的IP。broker.id設置為唯一數字就可以。比如210機器我設置的id=1.


b. 編輯 config/producer.properties 文件

[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] producer.properties 
metadata.broker.list=192.168.12.210:9092,192.168.12.211:9092,192.168.12.212:9092
producer.type=async
compression.codec=none
serializer.class=kafka.serializer.DefaultEncoder

  • broker.list:集群中 Broker 地址列表。
  • producer.type: Producer 類型,async 異步生產者,sync 同步生產者。
c. 編輯 config/consumer.properties 文件
[email protected]:~/kafka_2.10-0.8.2.1/config$ grep ^[a-z] consumer.properties 
zookeeper.connect=192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
zookeeper.connection.timeout.ms=6000
group.id=test-consumer-group
  • zookeeper.connect: Consumer 能夠連接的 zookeeper server地址列表。

當我們將210機器的配置所有設置完成之後,我們使用scp工具將kafka文件夾和zk_dir文件夾復制到211和212機器上,特別註意須要改動幾個文件。

1、config/server.properties 文件broker.id和hostname

2、zk_dir/myid

簡單的能夠將兩個id相應起來,可是必需要保證集群環境中ID唯一


服務啟動

對於kafka集群我們須要啟動兩個服務。

1、分別在三臺不同集群啟動zookeeper服務

前臺啟動:sh /home/supermap/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/zookeeper.properties

後臺啟動:nohup sh /home/supermap/kafka_2.10-0.8.2.1/bin/zookeeper-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/zookeeper.properties &


註意:一開始啟動第一臺機器,服務信息會提示不能連接其它兩臺機器的消息輸出。

這個是正常的。假設其它兩臺機器服務都啟動了就沒有問題了。

[2016-03-21 23:12:04,946] WARN Cannot open channel to 2 at election address /192.168.12.211:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)
[2016-03-21 23:12:04,949] WARN Cannot open channel to 3 at election address /192.168.12.212:3888 (org.apache.zookeeper.server.quorum.QuorumCnxManager)
java.net.ConnectException: Connection refused
	at java.net.PlainSocketImpl.socketConnect(Native Method)
	at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339)
	at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200)
	at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182)
	at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
	at java.net.Socket.connect(Socket.java:579)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectOne(QuorumCnxManager.java:368)
	at org.apache.zookeeper.server.quorum.QuorumCnxManager.connectAll(QuorumCnxManager.java:402)
	at org.apache.zookeeper.server.quorum.FastLeaderElection.lookForLeader(FastLeaderElection.java:840)
	at org.apache.zookeeper.server.quorum.QuorumPeer.run(QuorumPeer.java:762)


2、分別在三臺不同集群啟動kafka服務

前臺啟動:sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/server.properties

後臺啟動:nohup sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-server-start.sh /home/supermap/kafka_2.10-0.8.2.1/config/server.properties &


驗證安裝


1、創建和查看消息主題


連接zookeeper。創建一個名為user-behavior-topic的topic

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --create > --replication-factor 3 > --partition 3 > --topic user-behavior-topic > --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181
Created topic "user-behavior-topic".

查看此topic屬性

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --describe --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 --topic user-behavior-topic
Topic:user-behavior-topic	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: user-behavior-topic	Partition: 0	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
	Topic: user-behavior-topic	Partition: 1	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2
	Topic: user-behavior-topic	Partition: 2	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3

查看已經創建的topic列表

[email protected]:~$ sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --list --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 user-behavior-topic 
user-behavior-topic


2、創建消息生產者發送消息

sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-console-producer.sh --broker-list 192.168.12.210:9092,192.168.12.211:9092,192.168.12.212:9092 --topic user-behavior-topic

3、創建消息消費者接收消息

sh /home/supermap/kafka_2.10-0.8.2.1/bin/kafka-console-consumer.sh --zookeeper 192.168.12.210:2181,192.168.12.211:2181,192.168.12.212:2181 --from-beginning --topic user-behavior-topic
技術分享










Kafka集群部署及測試