1. 程式人生 > >kafka實戰 - 處理大檔案需要注意的配置引數

kafka實戰 - 處理大檔案需要注意的配置引數

概述

  kafka配置引數有很多,可以做到高度自定義。但是很多使用者拿到kafka的配置檔案後,基本就是配置一些host,port,id之類的資訊,其他的配置項採用預設配置,就開始使用了。這些預設配置是經過kafka官方團隊經過嚴謹寬泛的測試之後,求到的最優值。在單條資訊很小,大部分場景下都能得到優異的效能。但是如果想使用kafka儲存一些比較大的,比如100M以上的資料,這些預設的配置引數就會出現各種各樣的問題。

  我們的業務是資料大小沒有什麼規律,小的只有幾kb,大的可能有幾百M。為了使得整體架構簡潔統一,降低維護成本,這些大小各異的樣本都需要流經kafka。這就要求把kafka的一些預設配置自定義,才能正確執行。這些配置可以分為3大塊,producer端的, broker端的,consumer端的。使用的kafka為0.10.2.0,以下的討論也只在這個版本做過測試。producer和consumer均使用php client rdkafka。

 

broker端的配置

  message.max.bytes,預設是1M。決定了broker可以接受多大的資料。如果採用預設配置,producer生產1M以上的資料都會被broker丟掉。所以這個引數需要設定為單條訊息的最大大小。和這個引數相關的還有一個topic級別的max.message.bytes,其實它和messafe.max.bytes是一個功能,只是針對topic的設定,只對單個topic有作用,不會影響到其他topic(其他topic仍然使用message.max.bytes)。

  replica.fetch.max.bytes, 預設也是1M。這個引數的描述是replica的FETCH請求從每個partition獲取資料的最大大小。如果把message.max.bytes設定為100M,那topic中就會有100M大小的資料。但是replica的FETCH請求最大大小卻是預設的1M。這樣造成的後果就是producer雖然成功了,但是資料沒法複製出去,kafka的備份功能就形同虛設了。但是剛才說的問題只有在0.8.2.x及以前的版本才會出現。在0.8.2.x之後的版本,即便replica.fetch.max.bytes採用預設值,也可以進行復制。FETCH請求是批量進行的,replica會發過來類似的請求 "topic_name : [ partition1, partition2, partition3 ]" 來進行批量複製。在0.8.2.x及以前版本中,如果replica.fetch.max.bytes小於碰到的第一條資料,那leader_broker會返回錯誤,而replica會不斷重試,但是永遠也成功不了,造成的後果就是broker之前的流量暴增,影響到真正有用的邏輯,但是實際上傳輸的都是重試資訊。在0.8.2.x之後,這個bug被修復,如果replica.fetch.max.bytes小於碰到的第一條資料,會停止去其他的partition繼續獲取資料,直接把這條資料返回。可以明顯地看到,功能雖然保住了,但是可能會造成如下2個問題:

    <1> 批量複製退化成單條複製。假設有broker1和broker2,broker2複製broker1。如果broker1上面有很多partition,那複製的過程就是一個一個partition地複製,效率可想而知。

    <2> 假設partition1增長地很快,而且單條訊息都超過了 replica.fetch.max.bytes。但是partition2和partition3增長地沒有partition1快。那麼每次都只會直接返回partition1的第一條需要複製的資料,partition2和partition3的永遠都沒有機會複製。

  不過第二個問題官網上說已經被解決了,會把請求複製中partition的順序隨機打亂,讓每個partition都有機會成為第一個被複制的partition。但是筆者沒有做過測試,是否真地解決了還不是很清楚。

  所以綜合第一個和第二個問題,這個引數還是手動設定一下比較好,設定為比message.max.bytes稍大一些。這樣批量複製退化為單條複製這種問題會在很大程度上緩解,而且第二個問題也不會再出現。

  相關的討論可以在這裡找到:

    https://blog.csdn.net/guoyuqi0554/article/details/48630907

    https://issues.apache.org/jira/browse/KAFKA-1756

    https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes

  log.segment.bytes,預設是1G。確保這個值大於單條資料的最大大小即可。

  bin目錄下的kafka-run-class.sh中需要配置的引數,Brokers allocate a buffer the size of replica.fetch.max.bytes for each partition they replicate. If replica.fetch.max.bytes is set to 1 MiB, and you have 1000 partitions, about 1 GiB of RAM is required. Ensure that the number of partitions multiplied by the size of the largest message does not exceed available memory.The same consideration applies for the consumer fetch.message.max.bytes setting. Ensure that you have enough memory for the largest message for each partition the consumer replicates. With larger messages, you might need to use fewer partitions or provide more RAM。kafka本身執行在JVM上,如果設定的replica.fetch.max.bytes很大,或者partition很多,則需要調整-Xms2048m, -Xmx2048m和--XX:MaxDirectMemorySize=8192m, 前2者調的過小,kafka會出現java.lang.OutOfMemoryError: Java heap space的錯誤。第3個配置的太小,kafka會出現java.lang.OutOfMemoryError: Direct buffer memory的錯誤。

 

producer端的配置

  message.max.byte, 最大可傳送長度。如果這個配置小於當前要傳送的單個數據的大小,程式碼會直接拋異常Uncaught exception 'RdKafka\Exception' with message 'Broker: Message size too large',請求也不會發送到broker那裡。

  socket.timeout.ms, 預設為60000ms,網路請求的超時時間。

  message.timeout.ms,預設為300000ms,訊息傳送timeout。client的send呼叫返回並不一定是已經把訊息傳送出去了。client這一端其實會攢buffer,然後批量發。一個訊息如果在特定時間(min(socket.timeout.ms, message.timeout.ms ))內沒有被髮出去,那麼當回撥被呼叫時,會得到“time out”的錯誤。這個引數和上面的socket.timeout.ms在網路情況不好,或者傳送資料非常大的時候需要設定一下。不過一般的工作環境是在內網,使用預設配置一般不會出現什麼問題。

  

consumer端的配置

  fetch.max.bytes, 這個引數決定了可以成功消費到的最大資料。比如這個引數設定的是10M,那麼consumer能成功消費10M以下的資料,但是最終會卡在消費大於10M的資料上無限重試。fetch.max.bytes一定要設定到大於等於最大單條資料的大小才行。

  receive.message.max.bytes,一般在C/S架構下,C和S都是通過一種特殊的協議進行通訊的,kafka也不例外。fetch.max.bytes決定的只是response中純資料的大小,而kafka的FETCH協議最大會有512位元組的協議頭,所以這個引數一般被設定為fetch.max.bytes+512。 

  session.timeout.ms,預設是10000ms,會話超時時間。當我們使用consumer_group的模式進行消費時,kafka如果檢測到某個consumer掛掉,就會記性rebalance。consumer每隔一段時間(heartbeat.interval.ms)給broker傳送心跳訊息,如果超過這個時間沒有傳送,broker就會認為這個consumer掛了。這個引數的有效取值範圍是broker端的設定group.min.session.timeout.ms(6000)和group.max.session.timeout.ms(300000)之間。

  max.poll.interval.ms, 預設是300000ms,也是檢測consumer失效的timeout,這個timeout針對地是consumer連續2次從broker消費訊息的時間間隔。為什麼有了session.timeout.ms又要引入max.poll.interval.ms? 在kafka 0.10.0 之前,consumer消費訊息和傳送心跳資訊這兩個功能是在一個執行緒中進行的。這樣就會引發一個問題,如果某條資料process的時間較長,那麼consumer就無法給broker傳送心跳資訊,broker就會認為consumer死了。所以不得不提升session.timeout.ms來解決這個問題。但是這又引入了另外一個問題,如果session.timeout.ms設定得很大,那麼檢測一個consumer掛掉的時間就會很長,如果業務是實時的,那這就是不能忍受的。所以在 0.10.0 之後,傳送心跳資訊這個功能被拎出來在單獨的執行緒中做,session.timeout.ms就是針對這個執行緒到底能不能按時傳送心跳的。但是如果這個執行緒執行正常,但是消費執行緒掛了呢?這就無法檢測了啊。所以就引進了max.poll.interval.ms,用來解決這個問題。所以如果使用比較新的producer庫,恰好有些資料處理時間比較長,就可以適當增加這個引數的值。但是這個配置在php的client沒有找到,應該是不支援。具體怎麼實現這個引數的功能,還有待學習更新。但是java client可以配置這個引數。

  關於max.poll.interval.ms的討論:https://stackoverflow.com/questions/39730126/difference-between-session-timeout-ms-and-max-poll-interval-ms-for-kafka-0-10-0

 

確保資料安全性的配置

  producer端可以配置一個叫acks的引數。代表的是broker再向producer返回寫入成功的response時,需要確保寫入ISR broker的個數。0表示broker不用返回response,1表示broker寫入leader後即返回,-1表示broker寫入所有ISR後返回。

    如果想讓資料丟失的可能性降到最小,就設定副本數為多個,acks=-1。但是一般的做法是另acks稍微小於副本數。比如有3個副本,設定acks為2。那麼broker只需要寫入leader和另外一個副本就可以給producer返回response。這樣做的好處是,系統整體寫入延遲決定於最快的2個broker。寫入leadre,剩下的2個副本只要有1個返回成功,leader即可向producer返回寫入成功。

    但是有一個問題,如果ISR中,除了leader,剩下的副本全掛了怎麼辦?這樣即便我們設定acks=-1, 也只是寫入leader就返回,我們什麼都不知道,還以為是寫入了所有的副本才返回寫入成功的。為了解決這個問題,kafka在broker端引入了一個配置,min.insync.replicas。如果acks設定為-1,但是寫入ISR的個數小於min.insync.replicas配置的個數,則producer程式碼會丟擲NotEnoughReplicas讓開發人員指匯出現了問題。