1. 程式人生 > >ELK叢集-Kafka安裝配置(三)

ELK叢集-Kafka安裝配置(三)

安裝

kafka:

tar -zxvf kafka_2.11-2.1.0.tgz
mv kafka_2.11-2.1.0  /home/kafka-2.11

zookeeper:

tar -zxvf zookeeper-3.4.13.tar.gz
mv zookeeper-3.4.13 /home/zookeeper-3.4.13

配置

kafka => server.properties

#每一個broker在叢集中的唯一表示,要求是正數。當該伺服器的IP地址發生改變時,broker.id沒有變化,則不會影響consumers的訊息情況
broker.id=0

#192.168.1.199-內網ip,PLAINTEXT協議
advertised.listeners=PLAINTEXT://192.168.1.199:9092

#broker處理訊息的最大執行緒數,一般情況下數量為cpu核數	
num.network.threads=2

#broker處理磁碟IO的執行緒數,數值為cpu核數2倍
num.io.threads=8

#socket的傳送緩衝區,socket的調優引數SO_SNDBUFF
socket.send.buffer.bytes=1048576

#socket的接受緩衝區,socket的調優引數SO_RCVBUFF
socket.receive.buffer.bytes=1048576

#socket請求的最大數值,防止serverOOM,message.max.bytes必然要小於socket.request.max.bytes,會被topic建立時的指定引數覆蓋
socket.request.max.bytes=104857600

#kafka資料的存放地址,多個地址的話用逗號分割,多個目錄分佈在不同磁碟上可以提高讀寫效能  /data/kafka-logs-1,/data/kafka-logs-2
log.dirs=/tmp/kafka-logs

#每個topic的分割槽個數,若是在topic建立時候沒有指定的話會被topic建立時的指定引數覆蓋
num.partitions=2

#資料檔案保留多長時間, 儲存的最大時間超過這個時間會根據log.cleanup.policy設定資料清除策略log.retention.bytes和log.retention.minutes或log.retention.hours任意一個達到要求,
#都會執行刪除,有2刪除資料檔案方式:按照檔案大小刪除:log.retention.bytes,按照2中不同時間粒度刪除:分別為分鐘,小時
log.retention.hours=168

#topic的分割槽是以一堆segment檔案儲存的,這個控制每個segment的大小,會被topic建立時的指定引數覆蓋
log.segment.bytes=536870912

#檔案大小檢查的週期時間,是否處罰 log.cleanup.policy中設定的策略
log.retention.check.interval.ms=60000
#是否開啟日誌清理
log.cleaner.enable=false

#zookeeper叢集的地址,可以是多個,多個之間用逗號分割 hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181

#ZooKeeper的連線超時時間
zookeeper.connection.timeout.ms=1000000

#ZooKeeper叢集中leader和follower之間的同步實際那
zookeeper.sync.time.ms =2000

zookeeper => zoo.cfg
新增環境變數

vi  /etc/profile
新增
ZOOKEEPER_HOME=/home/zookeeper-3.4.13
export ZOOKEEPER_HOME
CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$ZOOKEEPER_HOME/lib:	

配置

# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=
10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/home/zookeeper-3.4.13/data dataLogDir=/home/zookeeper-3.4.13/logs # the port at which the clients will connect clientPort=
2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3

配置說明:
tickTime:這個時間是作為 Zookeeper 伺服器之間或客戶端與伺服器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。

dataDir:顧名思義就是 Zookeeper 儲存資料的目錄,預設情況下,Zookeeper 將寫資料的日誌檔案也儲存在這個目錄裡。

dataLogDir: log目錄, 同樣可以是任意目錄. 如果沒有設定該引數, 將使用和dataDir相同的設定。

clientPort:這個埠就是客戶端連線 Zookeeper 伺服器的埠,Zookeeper 會監聽這個埠,接受客戶端的訪問請求。

啟動

進入zookeeper下bin目錄,執行

./zkServer.sh start #啟動
netstat -tunlp|grep 2181 #檢視zookeeper埠
./zkServer.sh stop #停止

參考:zookeeper,zoo.cfg配置

進入kafka下bin目錄,執行

./kafka-server-start.sh   ../config/server.properties
或
nohup ./kafka-server-start.sh   ../config/server.properties  & (後臺啟動)

kafka簡單測試

1.建立一個topic
引數資訊:Zookeeper的資訊,Topic的名字,Topic的Partition數,複製因子(複製因子必須小於等於Broker數目)

./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

2.查詢topic列表

./kafka-topics.sh --list --zookeeper localhost:2181

3.Producer建立訊息
啟動時,除了列印SLF4J之外,沒有別的。下面可以直接輸入生產的資料 (生產訊息時不需要指定Partition,Kafka自動做Partition路由,每個Partition都是有Lead Partition和Follower Partitions組成,Lead Partition負責讀寫,而Follower Partitions只做複製,在Lead Partition掛了之後,自動做 Failover )

[[email protected] bin]$ ./kafka-console-producer.sh  --broker-list localhost:9092 --topic test

4.Consumer消費訊息
啟動時,除了列印SLF4J之外,沒有別的 –from-beginning實際上是指定offset的讀取策略。
smallest和largest策略表示Zookeeper上的offset還沒有初始化為正確值時,如何初始化offset的問題?試想,Producer生產了一批訊息到Kafka中,但是Kafka尚未由任何Consumer讀取,而Kafka的Offset是由Consumer進行初始化和賦值的,因此此時的Zookeeper上的offset並沒有預期的0(0表示尚未讀取過),而是一個不正確的隨機數,那麼Consumer來讀取訊息時,是從頭開始讀還是從最大的位置等待Producer建立訊息後再讀取,此時就產生了兩個選擇,smallest表示從頭讀,largest表示從最大位置讀
auto.offset.reset(預設是largest):
What to do when there is no initial offset in ZooKeeper or if an offset is out of range:
smallest : automatically reset the offset to the smallest offset
largest : automatically reset the offset to the largest offset

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

參考:1.kafka配置引數說明
2.kafka安裝
3.kafka,server.properties配置