1. 程式人生 > >kafka實戰 - 處理大文件需要註意的配置參數

kafka實戰 - 處理大文件需要註意的配置參數

了解 自定義 等於 副本 lead lar 做到 0.10 新的

概述

  kafka配置參數有很多,可以做到高度自定義。但是很多用戶拿到kafka的配置文件後,基本就是配置一些host,port,id之類的信息,其他的配置項采用默認配置,就開始使用了。這些默認配置是經過kafka官方團隊經過嚴謹寬泛的測試之後,求到的最優值。在單條信息很小,大部分場景下都能得到優異的性能。但是如果想使用kafka存儲一些比較大的,比如100M以上的數據,這些默認的配置參數就會出現各種各樣的問題。

  我們的業務是數據大小沒有什麽規律,小的只有幾kb,大的可能有幾百M。為了使得整體架構簡潔統一,降低維護成本,這些大小各異的樣本都需要流經kafka。這就要求把kafka的一些默認配置自定義,才能正確運行。這些配置可以分為3大塊,producer端的, broker端的,consumer端的。使用的kafka為0.10.2.0,以下的討論也只在這個版本做過測試。producer和consumer均使用php client rdkafka。

broker端的配置

  message.max.bytes,默認是1M。決定了broker可以接受多大的數據。如果采用默認配置,producer生產1M以上的數據都會被broker丟掉。所以這個參數需要設置為單條消息的最大大小。和這個參數相關的還有一個topic級別的max.message.bytes,其實它和messafe.max.bytes是一個功能,只是針對topic的設置,只對單個topic有作用,不會影響到其他topic(其他topic仍然使用message.max.bytes)。

  replica.fetch.max.bytes, 默認也是1M。這個參數的描述是replica的FETCH請求從每個partition獲取數據的最大大小。如果把message.max.bytes設置為100M,那topic中就會有100M大小的數據。但是replica的FETCH請求最大大小卻是默認的1M。這樣造成的後果就是producer雖然成功了,但是數據沒法復制出去,kafka的備份功能就形同虛設了。但是剛才說的問題只有在0.8.2.x及以前的版本才會出現。在0.8.2.x之後的版本,即便replica.fetch.max.bytes采用默認值,也可以進行復制。FETCH請求是批量進行的,replica會發過來類似的請求 "topic_name : [ partition1, partition2, partition3 ]" 來進行批量復制。在0.8.2.x及以前版本中,如果replica.fetch.max.bytes小於碰到的第一條數據,那leader_broker會返回錯誤,而replica會不斷重試,但是永遠也成功不了,造成的後果就是broker之前的流量暴增,影響到真正有用的邏輯,但是實際上傳輸的都是重試信息。在0.8.2.x之後,這個bug被修復,如果replica.fetch.max.bytes小於碰到的第一條數據,會停止去其他的partition繼續獲取數據,直接把這條數據返回。可以明顯地看到,功能雖然保住了,但是可能會造成如下2個問題:

    <1> 批量復制退化成單條復制。假設有broker1和broker2,broker2復制broker1。如果broker1上面有很多partition,那復制的過程就是一個一個partition地復制,效率可想而知。

    <2> 假設partition1增長地很快,而且單條消息都超過了 replica.fetch.max.bytes。但是partition2和partition3增長地沒有partition1快。那麽每次都只會直接返回partition1的第一條需要復制的數據,partition2和partition3的永遠都沒有機會復制。

  不過第二個問題官網上說已經被解決了,會把請求復制中partition的順序隨機打亂,讓每個partition都有機會成為第一個被復制的partition。但是筆者沒有做過測試,是否真地解決了還不是很清楚。

  所以綜合第一個和第二個問題,這個參數還是手動設置一下比較好,設置為比message.max.bytes稍大一些。這樣批量復制退化為單條復制這種問題會在很大程度上緩解,而且第二個問題也不會再出現。

  相關的討論可以在這裏找到:

    https://blog.csdn.net/guoyuqi0554/article/details/48630907

    https://issues.apache.org/jira/browse/KAFKA-1756

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

  log.segment.bytes,默認是1G。確保這個值大於單條數據的最大大小即可。

  bin目錄下的kafka-run-class.sh中需要配置的參數,Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM。kafka本身運行在JVM上,如果設置的replica.fetch.max.bytes很大,或者partition很多,則需要調整-Xms2048m, -Xmx2048m和--XX:MaxDirectMemorySize=8192m, 前2者調的過小,kafka會出現java.lang.OutOfMemoryError: Java heap space的錯誤。第3個配置的太小,kafka會出現java.lang.OutOfMemoryError: Direct buffer memory的錯誤。

producer端的配置

  message.max.byte, 最大可發送長度。如果這個配置小於當前要發送的單個數據的大小,代碼會直接拋異常Uncaught exception ‘RdKafka\Exception‘ with message ‘Broker: Message size too large‘,請求也不會發送到broker那裏。

  socket.timeout.ms, 默認為60000ms,網絡請求的超時時間。

  message.timeout.ms,默認為300000ms,消息發送timeout。client的send調用返回並不一定是已經把消息發送出去了。client這一端其實會攢buffer,然後批量發。一個消息如果在特定時間(min(socket.timeout.ms, message.timeout.ms ))內沒有被發出去,那麽當回調被調用時,會得到“time out”的錯誤。這個參數和上面的socket.timeout.ms在網絡情況不好,或者發送數據非常大的時候需要設置一下。不過一般的工作環境是在內網,使用默認配置一般不會出現什麽問題。

  

consumer端的配置

  fetch.max.bytes, 這個參數決定了可以成功消費到的最大數據。比如這個參數設置的是10M,那麽consumer能成功消費10M以下的數據,但是最終會卡在消費大於10M的數據上無限重試。fetch.max.bytes一定要設置到大於等於最大單條數據的大小才行。

  receive.message.max.bytes,一般在C/S架構下,C和S都是通過一種特殊的協議進行通信的,kafka也不例外。fetch.max.bytes決定的只是response中純數據的大小,而kafka的FETCH協議最大會有512字節的協議頭,所以這個參數一般被設置為fetch.max.bytes+512。 

  session.timeout.ms,默認是10000ms,會話超時時間。當我們使用consumer_group的模式進行消費時,kafka如果檢測到某個consumer掛掉,就會記性rebalance。consumer每隔一段時間(heartbeat.interval.ms)給broker發送心跳消息,如果超過這個時間沒有發送,broker就會認為這個consumer掛了。這個參數的有效取值範圍是broker端的設置group.min.session.timeout.ms(6000)和group.max.session.timeout.ms(300000)之間。

  max.poll.interval.ms, 默認是300000ms,也是檢測consumer失效的timeout,這個timeout針對地是consumer連續2次從broker消費消息的時間間隔。為什麽有了session.timeout.ms又要引入max.poll.interval.ms? 在kafka 0.10.0 之前,consumer消費消息和發送心跳信息這兩個功能是在一個線程中進行的。這樣就會引發一個問題,如果某條數據process的時間較長,那麽consumer就無法給broker發送心跳信息,broker就會認為consumer死了。所以不得不提升session.timeout.ms來解決這個問題。但是這又引入了另外一個問題,如果session.timeout.ms設置得很大,那麽檢測一個consumer掛掉的時間就會很長,如果業務是實時的,那這就是不能忍受的。所以在 0.10.0 之後,發送心跳信息這個功能被拎出來在單獨的線程中做,session.timeout.ms就是針對這個線程到底能不能按時發送心跳的。但是如果這個線程運行正常,但是消費線程掛了呢?這就無法檢測了啊。所以就引進了max.poll.interval.ms,用來解決這個問題。所以如果使用比較新的producer庫,恰好有些數據處理時間比較長,就可以適當增加這個參數的值。但是這個配置在php的client沒有找到,應該是不支持。具體怎麽實現這個參數的功能,還有待學習更新。但是java client可以配置這個參數。

  關於max.poll.interval.ms的討論:https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0

確保數據安全性的配置

  producer端可以配置一個叫acks的參數。代表的是broker再向producer返回寫入成功的response時,需要確保寫入ISR broker的個數。0表示broker不用返回response,1表示broker寫入leader後即返回,-1表示broker寫入所有ISR後返回。

   如果想讓數據丟失的可能性降到最小,就設置副本數為多個,acks=-1。但是一般的做法是另acks稍微小於副本數。比如有3個副本,設置acks為2。那麽broker只需要寫入leader和另外一個副本就可以給producer返回response。這樣做的好處是,系統整體寫入延遲決定於最快的2個broker。寫入leadre,剩下的2個副本只要有1個返回成功,leader即可向producer返回寫入成功。

   但是有一個問題,如果ISR中,除了leader,剩下的副本全掛了怎麽辦?這樣即便我們設置acks=-1, 也只是寫入leader就返回,我們什麽都不知道,還以為是寫入了所有的副本才返回寫入成功的。為了解決這個問題,kafka在broker端引入了一個配置,min.insync.replicas。如果acks設置為-1,但是寫入ISR的個數小於min.insync.replicas配置的個數,則producer代碼會拋出NotEnoughReplicas讓開發人員指導出現了問題。

kafka實戰 - 處理大文件需要註意的配置參數