1. 程式人生 > >kafka 服務端消費者和生產者的配置

kafka 服務端消費者和生產者的配置

在kafka的安裝目錄下,config目錄下有個名字叫做producer.properties的配置檔案

#指定kafka節點列表,用於獲取metadata,不必全部指定
#需要kafka的伺服器地址,來獲取每一個topic的分片數等元資料資訊。
metadata.broker.list=kafka01:9092,kafka02:9092,kafka03:9092

#生產者生產的訊息被髮送到哪個block,需要一個分組策略。
#指定分割槽處理類。預設kafka.producer.DefaultPartitioner,表通過key雜湊到對應分割槽
#partitioner.class=kafka.producer.DefaultPartitioner

#生產者生產的訊息可以通過一定的壓縮策略(或者說壓縮演算法)來壓縮。訊息被壓縮後傳送到broker叢集,
#而broker叢集是不會進行解壓縮的,broker叢集只會把訊息傳送到消費者叢集,然後由消費者來解壓縮。
#是否壓縮,預設0表示不壓縮,1表示用gzip壓縮,2表示用snappy壓縮。
#壓縮後訊息中會有頭來指明訊息壓縮型別,故在消費者端訊息解壓是透明的無需指定。
#文字資料會以1比10或者更高的壓縮比進行壓縮。
compression.codec
=none #指定序列化處理類,訊息在網路上傳輸就需要序列化,它有String、陣列等許多種實現。 serializer.class=kafka.serializer.DefaultEncoder #如果要壓縮訊息,這裡指定哪些topic要壓縮訊息,預設empty,表示不壓縮。 #如果上面啟用了壓縮,那麼這裡就需要設定 #compressed.topics= #這是訊息的確認機制,預設值是0。在面試中常被問到。 #producer有個ack引數,有三個值,分別代表: #(1)不在乎是否寫入成功; #(2)寫入leader成功; #(3)寫入leader和所有副本都成功; #要求非常可靠的話可以犧牲效能設定成最後一種。 #為了保證訊息不丟失,至少要設定為1,也就 #是說至少保證leader將訊息儲存成功。 #設定傳送資料是否需要服務端的反饋,有三個值0,
1,-1,分別代表3種狀態: #0: producer不會等待broker傳送ack。生產者只要把訊息傳送給broker之後,就認為傳送成功了,這是第1種情況; #1: 當leader接收到訊息之後傳送ack。生產者把訊息傳送到broker之後,並且訊息被寫入到本地檔案,才認為傳送成功,這是第二種情況;#-1: 當所有的follower都同步訊息成功後傳送ack。不僅是主的分割槽將訊息儲存成功了, #而且其所有的分割槽的副本數也都同步好了,才會被認為發動成功,這是第3種情況。 request.required.acks=0 #broker必須在該時間範圍之內給出反饋,否則失敗。 #在向producer傳送ack之前,broker允許等待的最大時間 ,如果超時, #broker將會向producer傳送一個error ACK.意味著上一次訊息因為某種原因 #未能成功(比如follower未能同步成功) request.timeout.ms
=10000 #生產者將訊息傳送到broker,有兩種方式,一種是同步,表示生產者傳送一條,broker就接收一條; #還有一種是非同步,表示生產者積累到一批的訊息,裝到一個池子裡面快取起來,再發送給broker, #這個池子不會無限快取訊息,在下面,它分別有一個時間限制(時間閾值)和一個數量限制(數量閾值)的引數供我們來設定。 #一般我們會選擇非同步。 #同步還是非同步傳送訊息,預設“sync”表同步,"async"表非同步。非同步可以提高發送吞吐量, #也意味著訊息將會在本地buffer中,並適時批量傳送,但是也可能導致丟失未傳送過去的訊息 producer.type=sync #在async模式下,當message被快取的時間超過此值後,將會批量傳送給broker, #預設為5000ms #此值和batch.num.messages協同工作. queue.buffering.max.ms = 5000 #非同步情況下,快取中允許存放訊息數量的大小。 #在async模式下,producer端允許buffer的最大訊息量 #無論如何,producer都無法儘快的將訊息傳送給broker,從而導致訊息在producer端大量沉積 #此時,如果訊息的條數達到閥值,將會導致producer端阻塞或者訊息被拋棄,預設為10000條訊息。 queue.buffering.max.messages=20000 #如果是非同步,指定每次批量傳送資料量,預設為200 batch.num.messages=500 #在生產端的緩衝池中,訊息傳送出去之後,在沒有收到確認之前,該緩衝池中的訊息是不能被刪除的, #但是生產者一直在生產訊息,這個時候緩衝池可能會被撐爆,所以這就需要有一個處理的策略。 #有兩種處理方式,一種是讓生產者先別生產那麼快,阻塞一下,等會再生產;另一種是將緩衝池中的訊息清空。 #當訊息在producer端沉積的條數達到"queue.buffering.max.meesages"後阻塞一定時間後, #佇列仍然沒有enqueue(producer仍然沒有傳送出任何訊息) #此時producer可以繼續阻塞或者將訊息拋棄,此timeout值用於控制"阻塞"的時間 #-1: 不限制阻塞超時時間,讓produce一直阻塞,這個時候訊息就不會被拋棄 #0: 立即清空佇列,訊息被拋棄 queue.enqueue.timeout.ms=-1 #當producer接收到error ACK,或者沒有接收到ACK時,允許訊息重發的次數 #因為broker並沒有完整的機制來避免訊息重複,所以當網路異常時(比如ACK丟失) #有可能導致broker接收到重複的訊息,預設值為3. message.send.max.retries=3 #producer重新整理topic metada的時間間隔,producer需要知道partition leader #的位置,以及當前topic的情況 #因此producer需要一個機制來獲取最新的metadata,當producer遇到特定錯誤時, #將會立即重新整理 #(比如topic失效,partition丟失,leader失效等),此外也可以通過此引數來配置 #額外的重新整理機制,預設值600000 topic.metadata.refresh.interval.ms=60000

kafka的消費者配置(路徑和生產者配置檔案路徑相同),名字叫做.consumer.properties

#消費者叢集通過連線Zookeeper來找到broker。
#zookeeper連線伺服器地址
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181

#zookeeper的session過期時間,預設5000ms,用於檢測消費者是否掛掉
zookeeper.session.timeout.ms=5000

#當消費者掛掉,其他消費者要等該指定時間才能檢查到並且觸發重新負載均衡
zookeeper.connection.timeout.ms=10000

#這是一個時間閾值。
#指定多久消費者更新offset到zookeeper中。
#注意offset更新時基於time而不是每次獲得的訊息。
#一旦在更新zookeeper發生異常並重啟,將可能拿到已拿到過的訊息
zookeeper.sync.time.ms=2000

#指定消費
group.id=xxxxx

#這是一個數量閾值,經測試是500條。
#當consumer消費一定量的訊息之後,將會自動向zookeeper提交offset資訊#注意offset資訊並不是每消費一次訊息就向zk提交
#一次,而是現在本地儲存(記憶體),並定期提交,預設為true
auto.commit.enable=true

# 自動更新時間。預設60 * 1000
auto.commit.interval.ms=1000

# 當前consumer的標識,可以設定,也可以有系統生成,
#主要用來跟蹤訊息消費情況,便於觀察
conusmer.id=xxx

# 消費者客戶端編號,用於區分不同客戶端,預設客戶端程式自動產生
client.id=xxxx

# 最大取多少塊快取到消費者(預設10)
queued.max.message.chunks=50

# 當有新的consumer加入到group時,將會reblance,此後將會
#有partitions的消費端遷移到新  的consumer上,如果一個
#consumer獲得了某個partition的消費許可權,那麼它將會向zk
#註冊 "Partition Owner registry"節點資訊,但是有可能
#此時舊的consumer尚沒有釋放此節點, 此值用於控制,
#註冊節點的重試次數.
rebalance.max.retries=5

#每拉取一批訊息的最大位元組數
#獲取訊息的最大尺寸,broker不會像consumer輸出大於
#此值的訊息chunk 每次feth將得到多條訊息,此值為總大小,
#提升此值,將會消耗更多的consumer端記憶體
fetch.min.bytes=6553600

#當訊息的尺寸不足時,server阻塞的時間,如果超時,
#訊息將立即傳送給consumer
#資料一批一批到達,如果每一批是10條訊息,如果某一批還
#不到10條,但是超時了,也會立即傳送給consumer。
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360

# 如果zookeeper沒有offset值或offset值超出範圍。
#那麼就給個初始的offset。有smallest、largest、
#anything可選,分別表示給當前最小的offset、
#當前最大的offset、拋異常。預設largest
auto.offset.reset=smallest

# 指定序列化處理類
derializer.class=kafka.serializer.DefaultDecoder

以上內容轉載自:https://www.cnblogs.com/jun1019/p/6256371.html

 

硬體的選擇

1.磁碟吞吐量

  生產者客戶端的效能直接受到伺服器端磁碟吞吐量的影響。生產者生成的訊息必須被提交到伺服器儲存,大多數客戶端在傳送訊息後會一直等待,直到至少有一個伺服器確認訊息已經提交成功為止。也就是說,磁碟寫入速度越快,生成訊息的延遲就越低。機械硬碟(HDD)和固態盤(SSD)。固態盤的查詢和訪問速度都很快,提供了最好的效能。機械盤更便宜,單塊容量也更大。在同一個伺服器上使用多個機械盤,可以設定多個數據目錄,或者把它們設定成磁碟陣列,這樣可以提升機械硬碟的效能。

2.磁碟容量

  磁碟容量要多大取決於需要儲存的訊息的數量。

3.記憶體

  伺服器端的記憶體容量是影響客戶端效能的主要因素,磁碟效能影響生產者,而記憶體影響消費者。消費者一般從分割槽尾部讀取訊息,如果有生產者存在,就緊跟在生產者後面。這種情況下,消費者讀的訊息會直接存放在系統的頁面快取裡,這比從磁碟上重新讀取要快。不建議把kafka同其他重要的應用部署在一起,因為它們需要共享頁面快取,最終會降低kafka消費者的效能。

4.網路

  網路吞吐量決定了kafka能夠處理的最大資料流量。它和磁碟效能是制約kafka擴充套件規模的主要因素。kafka支援多個消費者,造成流入和流出的網路流量不平衡,從而讓情況變得更加複雜。對於給定的主題,一個生產者可能每秒中寫入1MB資料,但可能同時有多個消費者瓜分網路流量。其它操作也會佔用網路流量。

5.CPU

  kafka對計算處理能力的要求較低,不過他在一定程度上還是會影響整體效能。客戶端為了優化網路和磁碟空間,會對訊息進行壓縮。伺服器需要對訊息進行批量解壓,設定偏移量,然後重新進行批量壓縮,再儲存到磁碟上。這就是kafka對計算能力有所要求的地方。

  使用kafka叢集的最大好處就是可以跨伺服器進行負載均衡,再則就是可以使用複製功能來避免因單點故障造成的資料丟失。在維護kafka或底層系統時,使用叢集可以確保為客戶端提供高可用性。