1. 程式人生 > >Kafka集群搭建

Kafka集群搭建

pic 配置環境變量 處理 jdk1.8 將他 版本 線程 partition connect

本文安裝環境

  操作系統:CentOS6.7 64位

  JDK版本:jdk1.8.0_131(要求JDK1.8以上)

  ZooKeeper版本:3.4.8

  Kakfa版本:0.9.0.1(Scala 2.11)

Java安裝

  需要使用JDK1.8以上版本,安裝和環境變量的配置參考Linux下安裝JDK1.8

ZooKeeper安裝與環境搭建

1.下載軟件包

  到ZooKeeper官網上http://zookeeper.apache.org/下載軟件包,最終是跳轉到鏡像下載,例如:

1 wget "http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.8/zookeeper-3.4.8.tar.gz
" 2 mv zookeeper-3.4.8.tar.gz /opt 3 tar -zxvf zookeeper-3.4.8.tar.gz

2.修改配置文件

  進入到解壓好的目錄裏面的conf目錄中,查看

1 cd /opt/zookeeper-3.4.8/conf
2 ls

  可以看到有一個名為 zoo_sample.cfg 的文件,這個文件是官方給我們的zookeeper的樣板文件,給他復制一份命名為zoo.cfg,zoo.cfg是官方指定的文件命名規則,然後對其中的參數進行修改

1 cp zoo_sample.cfg zoo.cfg
2 vim zoo.cfg

  修改配置文件如下:

 1 tickTime=2000
 2 initLimit=10
 3 syncLimit=5
 4 dataDir=/opt/zookeeper-3.4.8/data
 5 dataLogDir=/opt/zookeeper-3.4.8/datalog
 6 clientPort=2181
 7 server.1=192.168.7.100:2888:3888
 8 server.2=192.168.7.101:2888:3888
 9 server.3=192.168.7.107:2888:3888

  配置文件解釋:

#tickTime:
這個時間是作為 Zookeeper 服務器之間或客戶端與服務器之間維持心跳的時間間隔,也就是每個 tickTime 時間就會發送一個心跳。
#initLimit: 這個配置項是用來配置 Zookeeper 接受客戶端(這裏所說的客戶端不是用戶連接 Zookeeper 服務器的客戶端,而是 Zookeeper 服務器集群中連接到 Leader 的 Follower 服務器)初始化連接時最長能忍受多少個心跳時間間隔數。當已經超過 5個心跳的時間(也就是 tickTime)長度後 Zookeeper 服務器還沒有收到客戶端的返回信息,那麽表明這個客戶端連接失敗。總的時間長度就是 5*2000=10#syncLimit: 這個配置項標識 Leader 與Follower 之間發送消息,請求和應答時間長度,最長不能超過多少個 tickTime 的時間長度,總的時間長度就是5*2000=10秒 #dataDir: 快照日誌的存儲路徑 #dataLogDir: 事物日誌的存儲路徑,如果不配置這個那麽事物日誌會默認存儲到dataDir制定的目錄,這樣會嚴重影響zk的性能,當zk吞吐量較大的時候,產生的事物日誌、快照日誌太多 #clientPort: 這個端口就是客戶端連接 Zookeeper 服務器的端口,Zookeeper 會監聽這個端口,接受客戶端的訪問請求。修改他的端口改大點 #server.1
這個1是服務器的標識也可以是其他的數字, 表示這個是第幾號服務器,用來標識服務器,這個標識要寫到快照目錄下面myid文件裏
192.168.7.107為集群裏的IP地址,第一個端口是master和slave之間的通信端口,默認是2888,第二個端口是leader選舉的端口,集群剛啟動的時候選舉或者leader掛掉之後進行新的選舉的端口默認是3888

  創建myid文件

#server1
echo "1" > /opt/zookeeper-3.4.8/data/myid
#server2
echo "2" > /opt/zookeeper-3.4.8/data/myid
#server3
echo "3" > /opt/zookeeper-3.4.8/data/myid

3.重要配置說明(看看即可)

  1.myid文件和server.myid 在快照目錄下存放的標識本臺服務器的文件,他是整個zk集群用來發現彼此的一個重要標識。
  2.zoo.cfg 文件是zookeeper配置文件 在conf目錄裏。
  3.log4j.properties文件是zk的日誌輸出文件 在conf目錄裏用java寫的程序基本上有個共同點日誌都用log4j,來進行管理。

技術分享
# Define some default values that can be overridden by system properties
zookeeper.root.logger=INFO, CONSOLE  #日誌級別
zookeeper.console.threshold=INFO  #使用下面的console來打印日誌
zookeeper.log.dir=.    #日誌打印到那裏,是咱們啟動zookeeper的目錄 (建議設置統一的日誌目錄路徑)
zookeeper.log.file=zookeeper.log
zookeeper.log.threshold=DEBUG
zookeeper.tracelog.dir=.
zookeeper.tracelog.file=zookeeper_trace.log

#
# ZooKeeper Logging Configuration
#

# Format is "<default threshold> (, <appender>)+

# DEFAULT: console appender only
log4j.rootLogger=${zookeeper.root.logger}

# Example with rolling log file
#log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE

# Example with rolling log file and tracing
#log4j.rootLogger=TRACE, CONSOLE, ROLLINGFILE, TRACEFILE

#
# Log INFO level and above messages to the console
#
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.Threshold=${zookeeper.console.threshold}
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


# Add ROLLINGFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.ROLLINGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=${zookeeper.log.threshold}
log4j.appender.ROLLINGFILE.File=${zookeeper.log.dir}/${zookeeper.log.file}

# Max log file size of 10MB
log4j.appender.ROLLINGFILE.MaxFileSize=10MB
# uncomment the next line to limit number of backup files
#log4j.appender.ROLLINGFILE.MaxBackupIndex=10

log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L] - %m%n


#
# Add TRACEFILE to rootLogger to get log file output
#    Log DEBUG level and above messages to a log file
log4j.appender.TRACEFILE=org.apache.log4j.FileAppender
log4j.appender.TRACEFILE.Threshold=TRACE
log4j.appender.TRACEFILE.File=${zookeeper.tracelog.dir}/${zookeeper.tracelog.file}

log4j.appender.TRACEFILE.layout=org.apache.log4j.PatternLayout
### Notice we are including log4j‘s NDC here (%x)
log4j.appender.TRACEFILE.layout.ConversionPattern=%d{ISO8601} [myid:%X{myid}] - %-5p [%t:%C{1}@%L][%x] - %m%n

configuration for log4j
log4j.propertise

  4.zkEnv.sh和zkServer.sh文件
  zkServer.sh 主的管理程序文件
  zkEnv.sh 是主要配置,zookeeper集群啟動時配置環境變量的文件
  5.還有一個需要註意
  ZooKeeper server will not remove old snapshots and log files when using the default configuration (see autopurge below), this is the responsibility of the operator
  zookeeper不會主動的清除舊的快照和日誌文件,這個是操作者的責任。
  但是可以通過命令去定期的清理。

技術分享
#!/bin/bash 
 
#snapshot file dir 
dataDir=/opt/zookeeper/zkdata/version-2
#tran log dir 
dataLogDir=/opt/zookeeper/zkdatalog/version-2

#Leave 66 files 
count=66 
count=$[$count+1] 
ls -t $dataLogDir/log.* | tail -n +$count | xargs rm -f 
ls -t $dataDir/snapshot.* | tail -n +$count | xargs rm -f 

#以上這個腳本定義了刪除對應兩個目錄中的文件,保留最新的66個文件,可以將他寫到crontab中,設置為每天淩晨2點執行一次就可以了。


#zk log dir   del the zookeeper log
#logDir=
#ls -t $logDir/zookeeper.log.* | tail -n +$count | xargs rm -f
clear code

  其他方法:
  第二種:使用ZK的工具類PurgeTxnLog,它的實現了一種簡單的歷史文件清理策略,可以在這裏看一下他的使用方法 http://zookeeper.apache.org/doc/r3.4.6/zookeeperAdmin.html
  第三種:對於上面這個執行,ZK自己已經寫好了腳本,在bin/zkCleanup.sh中,所以直接使用這個腳本也是可以執行清理工作的。
  第四種:從3.4.0開始,zookeeper提供了自動清理snapshot和事務日誌的功能,通過配置 autopurge.snapRetainCount 和 autopurge.purgeInterval 這兩個參數能夠實現定時清理了。這兩個參數都是在zoo.cfg中配置的:
autopurge.purgeInterval 這個參數指定了清理頻率,單位是小時,需要填寫一個1或更大的整數,默認是0,表示不開啟自己清理功能。
autopurge.snapRetainCount 這個參數和上面的參數搭配使用,這個參數指定了需要保留的文件數目。默認是保留3個。
  推薦使用第一種方法,對於運維人員來說,將日誌清理工作獨立出來,便於統一管理也更可控。畢竟zk自帶的一些工具並不怎麽給力。

4.啟動服務並查看

#進入到Zookeeper的bin目錄下
cd /opt/zookeeper-3.4.8/bin
#啟動服務(每臺服務器都需要操作)
./zkServer.sh start
#檢查服務器狀態
./zkServer.sh status

zookeeper的常用命令:啟動Zookeeper服務: bin/zkServer.sh start
           查看服務狀態,包括節點類型: bin/zkServer.sh status
           停止服務: bin/zkServer.sh stop
           重啟服務: bin/zkServer.sh restart

Kafka安裝與環境搭建

1.下載軟件包

  到Kafka相關的鏡像網http://mirrors.hust.edu.cn/apache/kafka/0.9.0.1/下載軟件包,例如:

1 wget "http://mirrors.hust.edu.cn/apache/kafka/0.9.0.1/kafka_2.11-0.9.0.1.tgz"
2 mv kafka_2.11-0.9.0.1 /opt/   
3 tar -zxvf kafka_2.11-0.9.0.1.tgz

2.修改配置文件

  進入到config目錄

cd /opt/kafka_2.11-0.9.0.1/config/

  主要關註:server.properties 這個文件即可,我們可以發現在目錄下有很多文件,這裏可以發現有Zookeeper文件,我們可以根據Kafka內帶的zk集群來啟動,但是建議使用獨立的zk集群

  使用命令 vim server.properties 修改配置參數如下:

broker.id=0  #每臺服務器的broker.id都不能相同,為依次增長的0、1、2、3、4

port=9092
#hostname
host.name=192.168.7.100

log.dirs=/opt/kafka_2.11-0.9.0.1/logs    #日誌地址
#在log.retention.hours=168 下面新增下面三項
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#設置zookeeper的連接端口,各節點以逗號分開
zookeeper.connect=192.168.7.100:2181,192.168.7.101:2181,192.168.7.107:2181

  配置文件解釋:

broker.id=0  #當前機器在集群中的唯一標識,和zookeeper的myid性質一樣
port=19092 #當前kafka對外提供服務的端口默認是9092
host.name=192.168.7.100 #這個參數默認是關閉的,在0.8.1有個bug,DNS解析問題,失敗率的問題。
num.network.threads=3 #這個是borker進行網絡處理的線程數
num.io.threads=8 #這個是borker進行I/O處理的線程數
log.dirs=/opt/kafka/kafkalogs/ #消息存放的目錄,這個目錄可以配置為“,”逗號分割的表達式,上面的num.io.threads要大於這個目錄的個數這個目錄,如果配置多個目錄,新創建的topic他把消息持久化的地方是,當前以逗號分割的目錄中,那個分區數最少就放那一個
socket.send.buffer.bytes=102400 #發送緩沖區buffer大小,數據不是一下子就發送的,先回存儲到緩沖區了到達一定的大小後在發送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收緩沖區大小,當數據到達一定大小後在序列化到磁盤
socket.request.max.bytes=104857600 #這個參數是向kafka請求消息或者向kafka發送消息的請請求的最大數,這個值不能超過java的堆棧大小
num.partitions=1 #默認的分區數,一個topic默認1個分區數
log.retention.hours=168 #默認消息的最大持久化時間,168小時,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本數,如果一個副本失效了,另一個還可以繼續提供服務
replica.fetch.max.bytes=5242880  #取消息的最大直接數
log.segment.bytes=1073741824 #這個參數是:因為kafka的消息是以追加的形式落地到文件,當超過這個值的時候,kafka會新起一個文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去檢查上面配置的log失效時間(log.retention.hours=168 ),到目錄查看是否有過期的消息如果有,刪除
log.cleaner.enable=false #是否啟用log壓縮,一般不用啟用,啟用的話可以提高性能
zookeeper.connect=192.168.7.100:12181,192.168.7.101:12181,192.168.7.107:1218 #設置zookeeper的連接端口

3.啟動Kafka集群並測試

  先要確保zookeeper已啟動,然後在Kafka目錄執行:

nohup bin/kafka-server-start.sh config/server.properties&

  如果無報錯則說明啟動成功。nohup &是實現在後臺啟動。

4.啟動生產者和消費者

  打開2個終端,分別在Kafka目錄執行以下命令

  啟動producer

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

  啟動consumer

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

  這裏的test就是我們取的topic名

  在producer的命令行輸入任意字符,觀察consumer是否能正確接收。

Kafka集群搭建