kafka生產者使用不當導致的應用掛起/夯住
[TOC]
1. 背景和現象
1.1 kafka版本和部署狀態
kafka版本
server和client都是0.11.0
部署狀態
kafka多個節點(具體多少不清楚,但是肯定不是單節點),zookeeper3個節點。topic的分割槽副本數為2。具備高可用。
1.2 事件現象
在一次生產事件中,其中一個kafka節點和zk節點因物理機宕機下線,zk和kafka broker恢復後,生產者應用並沒有恢復,最終無法傳送訊息。
此時生產者端的應用業務流程無法繼續執行,流程走到producer模組就被Block住,然後每隔10s報錯一次。
重啟producer之後,應用恢復。
關鍵日誌
2018-11-07 10:52:24,015 [kfkBolt-tbl_qqhis_sq_trans_flow_raw-thread-0] [com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter:65] [ERROR] - produce:fail at seco nd time. java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms. at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1057) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:764) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:609) at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter$1.onCompletion(KafkaProducerWriter.java:57) at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:760) at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:701) at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:48) at com.unionpay.cloudatlas.galaxy.services.streamService.writer.KafkaProducerWriter.doOnce(KafkaProducerWriter.java:27) at com.unionpay.cloudatlas.upstorm.component.SimpleBolt.doOnce(SimpleBolt.java:187) at com.unionpay.cloudatlas.upstorm.component.SimpleBolt$InnerThread.run(SimpleBolt.java:105) Caused by: org.apache.kafka.common.errors.TimeoutException: Failed to allocate memory within the configured max blocking time 10000 ms.
1.3 生產者程式碼和配置
生產者程式碼
@Override public DataRecord doOnce(Record record) { // TODO Auto-generated method stub try { OperCounter.getInstance().increment(Constant.KEY_KAFKA_RECEIVE); final ProducerRecord<String, Record> proRecord = new ProducerRecord<String, Record>(topicPub, record); producer.send(proRecord, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { // TODO Auto-generated method stub // 傳送失敗 if (exception != null) { OperCounter.getInstance().increment(Constant.KEY_KAFKA_REIN); logger.warn("producer send fail and resend.", exception); try { producer.send(proRecord).get(); OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL); } catch (InterruptedException e) { // TODO Auto-generated catch block logger.error("produce:fail at second time.\t", e); OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT); } catch (ExecutionException e) { // TODO Auto-generated catch block logger.error("produce:fail at second time.\t", e); OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT); } catch (Exception e) { logger.error("produce:fail at second time.\t", e); OperCounter.getInstance().increment(Constant.KEY_KAFKA_REINJECT); } } else { OperCounter.getInstance().increment(Constant.KEY_KAFKA_DEAL); } } }); return null; } catch (TimeoutException e) { logger.error("produce:fail.\t", e); OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT); } catch (Exception e) { logger.error("produce:fail.\t", e); OperCounter.getInstance().increment(Constant.KEY_KAFKA_REJECT); } return null; }
生產者配置
bootstrap.servers=${KAFKA_SERVER_IN} key.serializer=org.apache.kafka.common.serialization.StringSerializer value.serializer=com.unionpay.cloudatlas.galaxy.common.protocol.kafka.RecordKryoSerializer #max time to wait,default to 60s max.block.ms=10000 batch.size=65536 buffer.memory=134217728 retries=3
通過上述程式碼和配置可以看出
- 最大block事件為1000ms,也就是10s
- buffer配置的較大,為134M
- 生產者先是非同步傳送,如果傳送失敗,則執行一次同步傳送
2. 問題初步定位和分析
2.1 kafka生產者簡介
排除服務端的疑點
在最終定位之前,我們懷疑過很多點,比如是不是kafka高可用存在bug、是不是zk出問題了、是不是kafka選主失敗等,最終通過生產的其他應用現象推論以及理論分析得出以下基本結論
- kafka本身高可用機制還是比較可靠的,宕機1臺節點,server的狀態可以快速回復正常
- zookeeper的高可用也沒有問題,3個節點的情況下,是允許1個節點下線的,zookeeper服務正常
- 宕機期間以及恢復後,kafka完成了leader節點的選舉
總的來說,就是不要懷疑服務端有問題。
當然,“不要懷疑服務端有問題”只是我們定位到了原因之後的後置結論,並不表示故障排查的時候忽略服務端的潛在問題,畢竟不管是硬體資源還是軟體質量都可能存在缺陷,尤其是開源產品的發展本身就是一個不斷迭代完善的過程。
關於kafka生產者
在說原因之前,還需要說明一下kafka的producer流程。kafka生產者傳送訊息的粗略流程如下:
- 首先應用呼叫send傳送
- 訊息的KV序列化
- 根據分割槽器決定訊息傳送到那個分割槽
- 將訊息新增到本地緩衝區,如果緩衝區滿,則當前執行緒block,直到緩衝區有足夠的空間或者達到最大阻塞時間(max.block.ms)
- 有一個獨立的IO執行緒負責從緩衝區中將訊息傳送到服務端
- IO執行緒收到響應之後,通知producer執行緒完成了傳送,如果需要,呼叫producer指定的回撥函式
注意,從上面的流程我們可以看出,在kafka的高版本客戶端(貌似是0.9之後)中,傳送訊息天然的是一個非同步的過程,也就是說,訊息傳送都是非同步方式進行的。而我們如果需要使用同步的方式傳送訊息,那麼我們只能通過KafkaProducer.send返回的Future物件完成,呼叫Future.get,關鍵程式碼如下
//KafkaProducer.send的方法簽名 //不提供回撥 public Future<RecordMetadata> send(ProducerRecord<K, V> record); //提供回撥 public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback)
當我們呼叫了Future.get的時候,我們做了什麼
上面出問題的程式碼,使用了同步的方式等待結果,那麼同步的get,到底是什麼樣的操作呢?
先來看下KafkaProducer.send返回的具體Future實現
KafkaProducer.doSend
RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs); if (result.batchIsFull || result.newBatchCreated) { log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition); this.sender.wakeup(); } return result.future;
上面的程式碼中返回的future即FutureRecordMetadata的例項,其實現了Future的get方法
public final class FutureRecordMetadata implements Future<RecordMetadata> { @Override public RecordMetadata get() throws InterruptedException, ExecutionException { //阻塞等待 this.result.await(); if (nextRecordMetadata != null) return nextRecordMetadata.get(); return valueOrError(); } @Override public RecordMetadata get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { // Handle overflow. long now = System.currentTimeMillis(); long deadline = Long.MAX_VALUE - timeout < now ? Long.MAX_VALUE : now + timeout; //阻塞等待 boolean occurred = this.result.await(timeout, unit); if (nextRecordMetadata != null) return nextRecordMetadata.get(deadline - System.currentTimeMillis(), TimeUnit.MILLISECONDS); if (!occurred) throw new TimeoutException("Timeout after waiting for " + TimeUnit.MILLISECONDS.convert(timeout, unit) + " ms."); return valueOrError(); } }
可以看到,get裡通過result.await阻塞等待,再看看這裡的result對應的類ProduceRequestResult
public final class ProduceRequestResult { private final CountDownLatch latch = new CountDownLatch(1); /** * Mark this request as complete and unblock any threads waiting on its completion. */ public void done() { if (baseOffset == null) throw new IllegalStateException("The method `set` must be invoked before this method."); this.latch.countDown(); } /** * Await the completion of this request */ public void await() throws InterruptedException { latch.await(); } }
可以看到,其內部的await中呼叫了CountDownLatch.await進行等待,同時提供了done方法,解除等待的狀態。
看到這裡就比較清晰了,如果應用通過get方式同步等待結果,其內部實現時使用了CountDownLatch的await方法,當結果返回的時候,IO執行緒會呼叫done方法結束等待狀態,並且返回結果。我們前面的分析只介紹瞭如何等待的,至於如何喚醒,將在下文介紹。
2.2 問題定位
通過上面的程式碼分析,我們幾乎可以猜測到問題的出現可能和這裡的設計有關——呼叫了get阻塞等待,但是由於某種原因,導致沒有人喚醒等待著的執行緒。
為了進一步驗證我們的想法,在開發環境復現生產事件的情況,當出現上述現象時,通過jstack抓一下執行緒快照,進一步證實了我們的猜想:
"kafka-producer-network-thread | PRODUCER_VERSION_UP_KAKFA_20181129_195652" daemon prio=10 tid=0x00007fd280253000 nid=0x20a5 waiting on condition [0x00007fd2879d8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for<0x0000000785982e70> (a java.util.concurrent.CountDownLatch$Sync) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:834) at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:994) at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1303) at java.util.concurrent.CountDownLatch.await(CountDownLatch.java:236) at org.apache.kafka.clients.producer.internals.ProduceRequestResult.await(ProduceRequestResult.java:76) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:61) at org.apache.kafka.clients.producer.internals.FutureRecordMetadata.get(FutureRecordMetadata.java:29) at com.unionpay.arch.bigdata.test.BigDataUPKafkaProducer$MyCallback.onCompletion(BigDataUPKafkaProducer.java:60) at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:201) at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:185) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:599) at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:575) at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:539) at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:474) at org.apache.kafka.clients.producer.internals.Sender.access$100(Sender.java:75) at org.apache.kafka.clients.producer.internals.Sender$1.onComplete(Sender.java:660) at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:454) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:446) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162) at java.lang.Thread.run(Thread.java:745)
這裡是開發環境復現的程式碼執行情況,因此Producer程式碼和生產上的不完全一致,但是邏輯相同。
通過執行緒快照可以發現,IO執行緒一致處於await狀態,因此後續流程無法執行!
關於CountDownLatch的設計和實現,感興趣的可以檢視相關文件。其實這裡說生產者應用被Block,嚴格意義上來說是不對的,執行緒狀態其實是WAITING,因此這裡的Block指的是“程式碼執行不下去,在當前狀態一直堵著”的狀態~
因此,問題定位如下
- 在非同步傳送的的回撥裡使用了同步的方式再次傳送,由於kafka producer的同步傳送是阻塞等待,且使用的是不帶超時時間的無限期等待(future.get()中未指定超時時間),因此當不被喚醒時會一直wai下去
- kafka生產者的IO執行緒(實際執行資料傳送的執行緒)是單執行緒模型,且回撥函式是在IO執行緒中執行的,因此回撥函式的阻塞會直接導致IO執行緒阻塞,於是生產者緩衝區的資料無法被髮送
- kafka生產者還在不斷的被應用呼叫,因此緩衝區一直累積並增大,當緩衝區滿的時候,生產者執行緒會被阻塞,最大阻塞時間為max.block.time,如果改時間到達之後還是無法將資料塞入緩衝區,則會丟擲一個異常,因此日誌中看到達到10s之後,打印出異常棧
- 由於使用了get沒有指定超時時間,且該await一直無法被喚醒,因此這種情況會一直持續,在沒有人工干預的情況下,永遠不會發送成功
生產建議
- kafka生產者推薦使用非同步方式傳送,並且提供回撥以響應傳送成功或者失敗
- 如果需要使用future.get的方式模擬同步傳送,則需要在get里加上合適的超時時間,避免因為不可預知的外部因素導致執行緒無法被喚醒,即使用Future.get(long timeout)的api而不是不帶超時引數的Future.get()
- 不要在非同步回撥中執行阻塞操作或者耗時比較久的操作,如果有必要可以考慮交給另一個執行緒(池)去做
3. Future.get為何沒有被喚醒
在前面的介紹中,我們定位了問題的原因,但也留下了一些疑問:
- 為何future.get沒有被喚醒?
- producer是何時執行了回撥操作的?
- 這種情況屬於應用使用不當還是kafka的bug?
3.1 HOW:分析思路
想要徹底弄清楚這個問題,恐怕要去好好擼一擼kafka producer的原始碼了。由於kafka producer的程式碼非常多,其中有緩衝區操作模組、IO執行模組、元資料更新模組、事務支援模組等很多設計,這裡就只從這次的事件問題切入分析,後面如果對kafka producer原始碼全面分析了之後再專門用幾篇文章描述。
那麼思路很簡單,主要從以下幾個方面入手
- 上一節中我們說到,造成wait的原因就是呼叫了CountDownLatch的await方法,那麼何處呼叫了CountDownLatch的countdown方法?
- 在所有呼叫了CountDownLatch.countdown的地方,是否包含了對kafka節點下線的處理?也就是說,難道kafka節點下線之後,流程就不會走到countdown了嗎?
為了弄清楚以上兩個問題,我們先去看看原始碼。通過對ProduceRequestResult的成員變數CountDownLatch latch分析可以知道,修改其狀態的方法只有2個await方法和一個done方法
/** * Mark this request as complete and unblock any threads waiting on its completion. */ public void done() { if (baseOffset == null) throw new IllegalStateException("The method `set` must be invoked before this method."); this.latch.countDown(); } /** * Await the completion of this request */ public void await() throws InterruptedException { latch.await(); } /** * Await the completion of this request (up to the given time interval) * @param timeout The maximum time to wait * @param unit The unit for the max time * @return true if the request completed, false if we timed out */ public boolean await(long timeout, TimeUnit unit) throws InterruptedException { return latch.await(timeout, unit); }
另外還有一個completed方法只是讀取狀態,不是修改,這裡就忽略了
其中兩個await方法一個是有時間引數的一個是沒有的,對應Future.get()和Future.get(long timeout),是導致阻塞的入口,因此也不用考慮,那麼重點就是這個done方法了。
3.2 WHEN:誰呼叫了done/什麼場景下會正常喚醒
通過eclipse提供的工具,可以一層一層追蹤出,有哪些地方呼叫了這個done,基本結論如下,
在Producer中 ,主要有設計到兩個邏輯(兩個類),其中
-
Sender,主要對於異常情況做做一些處理,以喚醒await的執行緒,包括
- 當連結被強制關閉時
- 當事務管理器中認為需要丟棄時
- 當有過期的資料時
-
NetworkClient,主要是處理髮送結果,包括
- 當傳送後返回失敗時
- 當返回訊息太大需要切分的時候
- 當傳送成功的時候
相應的邏輯和流程可以看具體的原始碼
Sender中相關邏輯流程圖如下

Untitled Diagram.png
NetworkClient中相關邏輯

NetworClient中呼叫done的地方.png
安利一個良心線上製圖網站ofollow,noindex">https://www.draw.io/
3.3 WHY:為何會一直wait卻沒有被喚醒
通過上面的分析,我們梳理了解除執行緒阻塞(WAIT)的幾個場景和時機,然而不幸的是,上面的場景均沒有機會被執行:
- 在kafka節點宕機時,同步傳送操作的message依然會被加入到生產者緩衝區,因為加入到緩衝區的過程和鏈路情況是解耦的,因此可以成功被塞到buffer
- 由於是同步的過程,因此塞到buffer之後,傳送者便開始了get()的無限期等待,直到有“人”喚醒
- 通過上面的分析我們發現:喚醒該同步等待的操作,都需要在Sender也就是IO執行緒中執行:要麼是由於各種原因覺得這個訊息需要abort,要麼是收到了正確或者錯誤的應答(fail or complete or split)。
- 此時奇妙的現象就發生了:同步等待的操作在IO執行緒,喚醒的操作也是在IO執行緒,這是同一個執行緒!也就是說,此刻已經發生了某種意義的“死鎖 ”
- IO執行緒已經被無限WAITing了,因此buffer中的資料再也無法被髮送
- 於是buffer越堆越多,直到達到buffer sizez之後,開始被block
- producer對block進行了控制,每次最大block的時間為max.block.time,然後向上丟擲一個異常,於是出現了日誌中的現象
綜上,這次生產實踐的原委基本清楚了。關於producer原始碼中的細節,後面再細細研讀~