1. 程式人生 > >Kafka 非同步訊息也會阻塞?記一次 Dubbo 頻繁超時排查過程

Kafka 非同步訊息也會阻塞?記一次 Dubbo 頻繁超時排查過程

線上某服務 A 呼叫服務 B 介面完成一次交易,一次晚上的生產變更之後,系統監控發現服務 B 介面頻繁超時,後續甚至返回執行緒池耗盡錯誤 Thread pool is EXHAUSTED。因為服務 B 依賴外部介面,剛開始誤以為外部介面延時導致,所以臨時增加服務 B dubbo 執行緒池執行緒數量。配置變更之後,重啟服務,服務恢復正常。一段時間之後,服務 B 再次返回執行緒池耗盡錯誤。這次深入排查問題之後,才發現 Kafka 非同步傳送訊息阻塞了 dubbo 執行緒,從而導致呼叫超時。

一、問題分析

Dubbo 2.6.5,Kafak maven 0.8.0-beta1

服務 A 呼叫服務 B,收到如下錯誤:

2019-08-30 09:14:52,311 WARN method [%f [DUBBO] Thread pool is EXHAUSTED! Thread Name: DubboServerHandler-xxxx, Pool Size: 1000 (active: 1000, core: 1000, max: 1000, largest: 1000), Task: 6491 (completed: 5491), Executor status:(isShutdown:false, isTerminated:false, isTerminating:false), in dubbo://xxxx!, dubbo version: 2.6.0, current host: 127.0.0.1

可以看到當前 dubbo 執行緒池已經滿載執行,不能再接受新的呼叫。正常情況下 dubbo 執行緒可以很快完成任務,然後歸還到執行緒池中。由於執行緒執行的任務發生阻塞,消費者端呼叫超時。而服務提供者端由於已有執行緒被阻塞,執行緒池必須不斷建立新執行緒處理任務,直到執行緒數量達到最大數量,系統返回 Thread pool is EXHAUSTED

執行緒任務長時間被阻塞可能原因有:

  • 頻繁的 fullgc,導致系統暫停。
  • 呼叫某些阻塞 API,如 socket 連線未設定超時時間導致阻塞。
  • 系統內部死鎖

通過分析系統堆疊 dump 情況,果然發現所有 dubbo 執行緒都處於 WATTING 狀態。

下圖為應用堆疊 dump 日誌:

從堆疊日誌可以看到 dubbo 執行緒最後阻塞在 LinkedBlockingQueue#put ,而該阻塞發生在 Kafka 傳送訊息方法內。

這裡服務 B 需要使用 Kafka 傳送監控訊息,為了訊息傳送不影響主業務,這裡使用 Kafka 非同步傳送訊息。由於 Kafka 服務端最近更換了對外的埠,而服務 B Kafka 配置未及時變更。最後服務 B 修改配置,服務重新啟動,該問題得以解決。

二、Kafka 非同步模式

下面分析 Kafka 非同步傳送訊息阻塞的實際原因。

0.8.0 Kafka 預設使用同步模式傳送訊息,非同步傳送訊息需要設定producer.type=async屬性。同步模式需要等待 Kafka 將訊息傳送到訊息佇列,這個過程當然會阻塞主執行緒。而非同步模式最大的優點在於無需要等待 Kafka 這個傳送過程。

原本認為這裡的非同步是使用子執行緒去執行任務,但是 Kafka 非同步模式並非這樣。檢視 Kafka 官方文件producer,可以看到對非同步模式描述。

Batching is one of the big drivers of efficiency, and to enable batching the Kafka producer has an asynchronous mode that accumulates data in memory and sends out larger batches in a single request. The batching can be configured to accumulate no more than a fixed number of messages and to wait no longer than some fixed latency bound (say 100 messages or 5 seconds). This allows the accumulation of more bytes to send, and few larger I/O operations on the servers. Since this buffering happens in the client it obviously reduces the durability as any data buffered in memory and not yet sent will be lost in the event of a producer crash.

從上我們可以看到,Kafka 非同步模式將會把多條訊息打包一塊批量傳送到服務端。這種模式將會先把訊息放到記憶體佇列中,直到訊息到達一定數量(預設為 200)或者等待時間超限(預設為 5000ms)。

這麼做最大好處在於提高訊息傳送的吞吐量,減少網路 I/O。當然這麼做也存在明顯劣勢,如果生產者宕機,在記憶體中還未傳送訊息可能就會丟失。

下面從 kafka 原始碼分析這個阻塞過程。

三、Kafka 原始碼解析

Kafka 訊息傳送端採用如下配置:

        Properties props = new Properties();

        props.put("metadata.broker.list", "localhost:9092");
    // 選擇非同步傳送
        props.put("producer.type", "async");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("queue.buffering.max.messages","1");
        props.put("batch.num.messages","1");
        Producer<Integer, String> producer= new Producer(new ProducerConfig(props));
        producer.send(new KeyedMessage("test", "hello world"));

這裡設定 producer.type=async,從而使 Kafka 非同步傳送訊息。

send 方法原始碼如下

ps: 這個版本 Kafka 原始碼採用 Scala 編寫,不過原始碼還是比較簡單,比較容易閱讀。

  def send(messages: KeyedMessage[K,V]*) {
    if (hasShutdown.get)
      throw new ProducerClosedException
    recordStats(messages)
    sync match {
      case true => eventHandler.handle(messages)
    // 由於  producer.type=async 非同步傳送
      case false => asyncSend(messages)
    }
  }

由於我們上面設定 producer.type=async,這裡將會使用 asyncSend 非同步傳送模式。

asyncSend 原始碼如下

  private def asyncSend(messages: Seq[KeyedMessage[K,V]]) {
    for (message <- messages) {
      val added = config.queueEnqueueTimeoutMs match {
        case 0  =>
          queue.offer(message)
        case _  =>
          try {
            config.queueEnqueueTimeoutMs < 0 match {
    
            case true =>
              queue.put(message)
              true
            case _ =>
              queue.offer(message, config.queueEnqueueTimeoutMs, TimeUnit.MILLISECONDS)
            }
          }
          catch {
            case e: InterruptedException =>
              false
          }
      }
      if(!added) {
        producerTopicStats.getProducerTopicStats(message.topic).droppedMessageRate.mark()
        producerTopicStats.getProducerAllTopicsStats.droppedMessageRate.mark()
        throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + message.toString)
      }else {
        trace("Added to send queue an event: " + message.toString)
        trace("Remaining queue size: " + queue.remainingCapacity)
      }
    }
  }

asyncSend 將會把訊息加入到 LinkedBlockingQueue 阻塞佇列中。這裡根據 config.queueEnqueueTimeoutMs引數使用不同方法。

config.queueEnqueueTimeoutMs=0,將會呼叫 LinkedBlockingQueue#offer,如果該佇列未滿,將會把元素插入佇列隊尾。如果佇列未滿,直接返回 false。所以如果此時佇列已滿,訊息不再會加入佇列中,然後 asyncSend 將會丟擲 QueueFullException 異常。

config.queueEnqueueTimeoutMs < 0,將會呼叫 LinkedBlockingQueue#put 加入元素,如果該佇列已滿,該方法將會一直被阻塞直到佇列存在可用空間。

config.queueEnqueueTimeoutMs > 0,將會呼叫 LinkedBlockingQueue#offer,這裡與上面不同之處在於設定超時時間,如果佇列已滿將會阻塞知道超時。

config.queueEnqueueTimeoutMs引數通過 queue.enqueue.timeout.ms 配置生效,預設為 -1。預設情況下 LinkedBlockingQueue 最大數量為 10000,可以通過設定 queue.buffering.max.messages 改變佇列最大值。

訊息放到佇列中後,Kafka 將會使用一個非同步執行緒不斷從佇列中獲取訊息,批量傳送訊息。

非同步處理訊息程式碼如下


  private def processEvents() {
    var lastSend = SystemTime.milliseconds
    var events = new ArrayBuffer[KeyedMessage[K,V]]
    var full: Boolean = false

    // drain the queue until you get a shutdown command
    Stream.continually(queue.poll(scala.math.max(0, (lastSend + queueTime) - SystemTime.milliseconds), TimeUnit.MILLISECONDS))
                      .takeWhile(item => if(item != null) item ne shutdownCommand else true).foreach {
      currentQueueItem =>
        val elapsed = (SystemTime.milliseconds - lastSend)
        // check if the queue time is reached. This happens when the poll method above returns after a timeout and
        // returns a null object
        val expired = currentQueueItem == null
        if(currentQueueItem != null) {
          trace("Dequeued item for topic %s, partition key: %s, data: %s"
              .format(currentQueueItem.topic, currentQueueItem.key, currentQueueItem.message))
          events += currentQueueItem
        }

        // check if the batch size is reached
        full = events.size >= batchSize

        if(full || expired) {
          if(expired)
            debug(elapsed + " ms elapsed. Queue time reached. Sending..")
          if(full)
            debug("Batch full. Sending..")
          // if either queue time has reached or batch size has reached, dispatch to event handler
          tryToHandle(events)
          lastSend = SystemTime.milliseconds
          events = new ArrayBuffer[KeyedMessage[K,V]]
        }
    }
    // send the last batch of events
    tryToHandle(events)
    if(queue.size > 0)
      throw new IllegalQueueStateException("Invalid queue state! After queue shutdown, %d remaining items in the queue"
        .format(queue.size))
  }

這裡非同步執行緒將會不斷從佇列中獲取任務,一旦條件滿足,就會批量傳送任務。該條件為:

  1. 批量訊息數量達到 200,可以設定 batch.num.messages 引數改變配置。
  2. 等待時間到達最大的超時時間,預設為 5000ms,可以設定 queue.buffering.max.ms 改變改配置。

四、問題解決辦法

上面問題雖然通過更換 Kafka 正確地址解決,但是為了預防下次該問題再發生,可以採用如下方案:

  1. 改變 config.queueEnqueueTimeoutMs預設配置,像這種系統監控日誌允許丟失,所以可以設定 config.queueEnqueueTimeoutMs=0
  2. 升級 Kafka 版本,最新版本 Kafka 使用 Java 重寫傳送端邏輯,不再使用阻塞佇列儲存訊息。

本文首發於:studyidea.cn/kafka…

歡迎關注我的公眾號:程式通事,獲得日常乾貨推送。如果您對我的專題內容感興趣,也可以關注我的部落格:studyidea.cn

相關推薦

Kafka 非同步訊息阻塞 Dubbo 頻繁超時排查過程

線上某服務 A 呼叫服務 B 介面完成一次交易,一次晚上的生產變更之後,系統監控發現服務 B 介面頻繁超時,後續甚至返回執行緒池耗盡錯誤 Thread pool is EXHAUSTED。因為服務 B 依賴外部介面,剛開始誤以為外部介面延時導致,所以臨時增加服務 B dubbo 執行緒池執行緒數量。配置變

MySQL-備份失敗的排查過程

          山竹來臨,窩在家裡整理個人文件。        本篇文章主要講解排查問題的思路,涉及linux 刪除檔案的原理、例項誤刪資料恢復、MySQL例項初始化引數優先級別等,雖然涉及知識點比較淺,但是個人覺得挺有意思的,所以翻出筆記釋出出來。  1 備份出錯咯

線上問題的排查過程

問題描述 前不久運維在例行釋出線上CS系統的時候,發現在服務啟動的過程中,後臺一直在報如下錯誤,同時導致使用者頁面訪問異常 2017-10-10 18:28:51,077 [ERROR] org.springframework.amqp.rabbit.l

解Bug之路-儲存故障的排查過程

# 解Bug之路-記一次儲存故障的排查過程 高可用真是一絲細節都不得馬虎。平時跑的好好的系統,在相應硬體出現故障時就會引發出潛在的Bug。偏偏這些故障在應用層的表現稀奇古怪,很難讓人聯想到是硬體出了問題,特別是偶發性出現的問題更難排查。今天,筆者就給大家帶來一個儲存偶發性故障的排查過程。 ## Bug現場 我

改造react腳手架的過程

lease nts rule 加載過程 npm req ems ner comm 公司突然組織需要重新搭建一個基於node的論壇系統,前端采用react,上網找了一些腳手架,或多或少不能滿足自己的需求,最終在基於YeoMan的react腳手架generator-react-

Oracle資料故障排除過程

前天在Oracle生產環境中,自己的儲存過程執行時間超過1小時,懷疑是其他job執行時間過長推遲了自己job執行時間,遂重新跑job,發現同測試環境的確不同,運行了25分鐘。 之後準備在測試環境中製造同數量級的資料進行分析,寫了大概如下的儲存過程, create or replace PROCEDU

dubbo異常

  com.alibaba.dubbo.rpc.RpcException: Forbid consumer 10.x9.xx.xx access service service.com.xx.xx.XxxService from registry 1xx.xx.xx.1:2181

nmap掃描資訊收集過程

1、首先掃描網段記憶體活主機 nmap -sP 192.168.1.1-254   2、獲取到存活主機後,開始進行埠掃描 nmap -p1-1000 192.168.1.66 可得到埠狀態: filtered:被過濾 closed:關閉 open:開放  

WPScan使用完整教程之對WordPress的滲透過程

WPScan使用完整教程之記一次對WordPress的滲透過程 渣渣一枚,萌新一個,會划水,會喊六六 本次簡單的記錄優下自己關於WPScan滲透實戰的案例,以及對於WPScan的一些使用方法,有什麼錯誤的地方希望各位大佬指正(Orz) 一:什麼是WPScan WPScan是一個掃描 WordPress

完整的效能測試過程

當我們聊起效能測試的時候,有人一定會說我們用的是loadrunner做效能,誠然,我們在進行效能測試工作的過程中,需要藉助工具的輔助來幫我們完成一些工作,但loadrunner並不等於效能測試,或者說,效能測試工具不等於效能測試,工具永遠是一種輔助的工具,而不能認為會用工具就會效能測試了!希

ZOOKEEPER叢集超時問題分析

CDH安裝的ZK,三個節點,基本都是預設配置,一直用得正常,今天出現問題,客戶端連線超時6倍時長,預設最大會話超時時間是一分鐘。原因分析:1.首先要確認網路正確。確認時鐘同步。2.檢視現有的配置,基本都是預設配置 JVM配置是1G 有 2g的,不一樣3.檢視dataDir目錄,du -sh .發現已經有五百多

Dubbo導致的記憶體洩漏過程分析及解決

       近日測試團隊反饋版本機測試環境請求經常卡頓,十分緩慢,甚至有超時的情況,但是請求返回、業務邏輯均是正常的,因此進行了一番排查。         首先檢視應用日誌,及控制檯監控,應用均表現無異常,由於版本

spring5原始碼完整編譯過程

學習java已有3年之久,spring一直停留在應用階段,兩次面試阿里的經歷讓我深感學習spring原始碼的重要性,廢話不多說,開搞! 1、環境: jdk1.8+spring5+gradle4.7+eclipse4.6 如果要參考該教程,環境最好一樣(eclipse除外

redis配置優化(線上redis問題排查)

一、問題描述 在通過redis快取進行了一系列的介面效能優化後,大部分介面返回在1ms~200ms間,這都是redis的功勞,但隨著介面redis快取越來越多,新的問題產生了,從redis取資料竟然用了5s = =,通過觀察日誌,並不是每次取資料都是5s,

SQL Server的清理過程

由於歷史原因,庫裡有幾張表的行數已經超過了幾億條,而且99%都是無用的歷史資料(別問我為什麼這麼多,就是這麼刺激),簡單的top 1查詢都能跑個十幾分鍾。 以上,是背景。 業務上來看,伺服器已經完全無法工作了,所以選擇了停機維護。 第一步,使用獲取總行數

Thrift Server錯誤排查

Call exception, tries=26, retries=35, started=432112 ms ago, cancelled=false, msg=row '536051594396' on table 'resys:taobao_comment' at null 2017-02-21 10:

spark任務調優過程

  最近跑了一個spark任務(邏輯迴歸訓練建模的),資料量是3000多萬左右,在跑的過程中出現了各種錯誤,跑了8h左右,結果還是錯了.最後下載了日誌,分析之後,做了些修改,最終跑成功了,而且只用了0.8h,這裡記錄下: 1.maven打包失敗問題:     我的專案是用

重灌windows10系統過程

我的wifi圖示有問題,只能連學校網,用不了其他wifi而且檢視的圖示是灰色無法使用的。故決定重灌系統。但希望能保留ubuntu系統不變,和虛擬機器裡的kali linux系統不變 ,不想再找啟用碼了。。。 雙系統:ubuntu17.1 windows10家庭版 先檢視win

Net軟體逆向的過程(經典)

查殼 1.先看下目錄結構: 2.查下,是什麼語言 ==> Net的,那不用說了,肯定能破解(畢竟是老本行嘛~) 混淆與反混淆 3.dnSpy開啟後發現很多變數是亂碼 4.用de4dot跑一波 5.生成了一個反混淆過的exe 程式除錯 6.改名後開啟,亂碼問題

ceph pg unfound處理過程

今天檢查ceph叢集,發現有pg丟失,於是就有了本文~~~ ### 1.檢視叢集狀態 ```shell [root@k8snode001 ~]# ceph health detail HEALTH_ERR 1/973013 objects unfound (0.000%); 17 scrub errors;