1. 程式人生 > >kafka(06)——kafka配置檔案的說明

kafka(06)——kafka配置檔案的說明

server.properties的配置說明

#broker的全域性唯一編號,不能重複
broker.id=0

#用來監聽連結的埠,producer或consumer將在此埠建立連線
port=9092

#處理網路請求的執行緒數量
num.network.threads=3

#用來處理磁碟IO的執行緒數量
num.io.threads=8

#傳送套接字的緩衝區大小
socket.send.buffer.bytes=102400

#接受套接字的緩衝區大小
socket.receive.buffer.bytes=102400

#請求套接字的緩衝區大小
socket.request.max.bytes=104857600

#kafka執行日誌存放的路徑,根據我們的磁碟路徑來定,多個磁碟用逗號隔開
log.dirs=/export/data/kafka/

#topic在當前broker上的分片個數,分割槽的數量值,一般都需要我們在建立topic的時候手動指定,覆蓋這個配置
num.partitions=2

#用來恢復和清理data下資料的執行緒數量
num.recovery.threads.per.data.dir=1

#segment檔案保留的最長時間,超時將被刪除
log.retention.hours=1

#滾動生成新的segment檔案的最大時間
log.roll.hours=1

#日誌檔案中每個segment的大小,預設為1G
log.segment.bytes=1073741824

#週期性檢查檔案大小的時間
log.retention.check.interval.ms=300000

#日誌清理是否開啟
log.cleaner.enable=true

#broker需要使用zookeeper儲存meta資料
zookeeper.connect=node01:2181,node02:2181,node03:2181

#zookeeper連結超時時間
zookeeper.connection.timeout.ms=6000

#partion buffer中,訊息的條數達到閾值,將觸發flush到磁碟
log.flush.interval.messages=10000

#訊息buffer的時間,達到閾值,將觸發flush到磁碟
log.flush.interval.ms=3000

#刪除topic需要server.properties中設定delete.topic.enable=true否則只是標記刪除
delete.topic.enable=true

#此處的host.name為本機IP(重要),如果不改,則客戶端會丟擲:Producer connection to localhost:9092 unsuccessful 錯誤!
host.name=node01
#如果我們需要通過公網連線我們的kafka的時候,才需要配置這個,一般我們的kafka不會放在公網裡面
#advertised.host.name=192.168.140.128

生產者配置檔案

#指定kafka節點列表,用於獲取metadata,不必全部指定
metadata.broker.list=node01:9092,node02:9092,node03:9092
# 指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
#partitioner.class=kafka.producer.DefaultPartitioner
# 是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
compression.codec=none
# 指定序列化處理類
serializer.class=kafka.serializer.DefaultEncoder
# 如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。
#compressed.topics=

# 設定傳送資料是否需要服務端的反饋,有三個值0,1,-1
# 0: producer不會等待broker傳送ack 
# 1: 當leader接收到訊息之後傳送ack 
# -1: 當所有的follower都同步訊息成功後傳送ack. 
request.required.acks=1

# 在向producer傳送ack之前,broker允許等待的最大時間 ,如果超時,broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種原因未能成功(比如follower未能同步成功) 
request.timeout.ms=10000

# 同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量,
也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息
producer.type=async

# 在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker,預設為5000ms
# 此值和batch.num.messages協同工作.
queue.buffering.max.ms = 5000

# 在async模式下,producer端允許buffer的最大訊息量
# 無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積
# 此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000
queue.buffering.max.messages=20000

# 如果是非同步,指定每次批量傳送資料量,預設為200
batch.num.messages=500

# 當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後 
# 阻塞一定時間後,佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息) 
# 此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間 
# -1: 無阻塞超時限制,訊息不會被拋棄 
# 0:立即清空佇列,訊息被拋棄 
queue.enqueue.timeout.ms=-1


# 當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數 
# 因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失) 
# 有可能導致broker接收到重複的訊息,預設值為3.
message.send.max.retries=3

# producer重新整理topic metada的時間間隔,producer需要知道partition leader的位置,以及當前topic的情況 
# 因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時,將會立即重新整理 
# (比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置額外的重新整理機制,預設值600000 
topic.metadata.refresh.interval.ms=60000

消費者配置檔案說明

# zookeeper連線伺服器地址
zookeeper.connect=node01:2181,node02:2181,node03:2181
# zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000
#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000
# 指定多久消費者更新offset到zookeeper中。注意offset更新時基於time而不是每次獲得的訊息。一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息
zookeeper.sync.time.ms=2000
#指定消費 
group.id=fgm
# 當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊 
# 注意offset資訊並不是每消費一次訊息就向zk提交一次,而是現在本地儲存(記憶體),並定期提交,預設為true
auto.commit.enable=true
# 自動更新時間。預設60 * 1000
auto.commit.interval.ms=1000
# 當前consumer的標識,可以設定,也可以有系統生成,主要用來跟蹤訊息消費情況,便於觀察
conusmer.id=xxx 
# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
client.id=xxxx
# 最大取多少塊快取到消費者(預設10)
queued.max.message.chunks=50
# 當有新的consumer加入到group時,將會reblance,此後將會有partitions的消費端遷移到新  的consumer上,如果一個consumer獲得了某個partition的消費許可權,那麼它將會向zk註冊 "Partition Owner registry"節點資訊,但是有可能此時舊的consumer尚沒有釋放此節點, 此值用於控制,註冊節點的重試次數. 
rebalance.max.retries=5

# 獲取訊息的最大尺寸,broker不會像consumer輸出大於此值的訊息chunk 每次feth將得到多條訊息,此值為總大小,提升此值,將會消耗更多的consumer端記憶體
fetch.min.bytes=6553600

# 當訊息的尺寸不足時,server阻塞的時間,如果超時,訊息將立即傳送給consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# 如果zookeeper沒有offset值或offset值超出範圍。那麼就給個初始的offset。有smallest、largest、anything可選,分別表示給當前最小的offset、當前最大的offset、拋異常。預設largest
auto.offset.reset=smallest
# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

在這裡插入圖片描述

配置檔案均在kafka的安裝包下的config目錄下:/export/servers/kafka_2.11-1.0.0/config 當然這麼多配置不用全都配的,根據自己的需求進行配置即可。