1. 程式人生 > >Kafka-python 客戶端導致的 cpu 使用過高,且無法消費消息的問題

Kafka-python 客戶端導致的 cpu 使用過高,且無法消費消息的問題

的確 fse get sum req 今天 als top report

今天遇到一個情況使用了 Kafka-python 1.3.3 來操作讀取 broker 1.0.1 版本的 kafka。出現了 rebalance 之後分配到了客戶端,但是 cpu 利用率很高且無法消費的情況。

先是排查了連接方面和代碼方面的問題,後來發現都沒有問題就把註意力轉移到了 kafka-client 本身。

搜索相關問題首先搜到了 kafka-python issues 1033

When no module exists to handle Snappy decompression, the KafkaConsumer returns no messages, rather than reporting the problem. This differs from the legacy Consumer API which provides a much more useful error message.

Background

I was attempting to fetch some data from a Kafka topic which was using snappy compression. No data was ever returned even though I knew data was being landed in the topic (confirmed with the Kafka CLI tools). This had me very confused.

>>> consumer = kafka.KafkaConsumer("test", bootstrap_servers=["svr:9092"])
>>> consumer.poll(5000)
{}

I then attempted to use the legacy consumer API which pointed me to the exact problem.

>>> client = kafka.SimpleClient("svr:9092")
>>> consumer.close()
>>> consumer = kafka.SimpleConsumer(client, "group", "test")
>>> for message in consumer:
...     print(message)
...
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 353, in __iter__
    message = self.get_message(True, timeout)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 305, in get_message
    return self._get_message(block, timeout, get_partition_info)
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 320, in _get_message
    self._fetch()
  File "/usr/lib/python2.7/site-packages/kafka/consumer/simple.py", line 379, in _fetch
    fail_on_error=False
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 665, in send_fetch_request
    KafkaProtocol.decode_fetch_response)
  File "/usr/lib/python2.7/site-packages/kafka/client.py", line 295, in _send_broker_aware_request
    for payload_response in decoder_fn(future.value):
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 212, in decode_fetch_response
    for partition, error, highwater_offset, messages in partitions
  File "/usr/lib/python2.7/site-packages/kafka/protocol/legacy.py", line 219, in decode_message_set
    inner_messages = message.decompress()
  File "/usr/lib/python2.7/site-packages/kafka/protocol/message.py", line 121, in decompress
    assert has_snappy(), ‘Snappy decompression unsupported‘
AssertionError: Snappy decompression unsupported

All I needed to do was install the python-snappy module to handle the decompression.

pip install python-snappy

跟我目前遭遇的情況非常相似。

的確我看了一下 requiments 裏面也確實沒有安裝 python-snappy。看了一下我使用的生產者也確實使用了 snappy 來壓縮 message 。

python-kafka 在新版本中修復了這個問題,如果沒有安裝 python-snappy 將會把錯誤 raise 出來而不是讓人不知所措。

所以我直接升級了 python-kafka 然後安裝了 python-snappy 便可以愉快運行了!

Reference:

https://github.com/dpkp/kafka-python/issues/1033 KafkaConsumer Fails to Report Problem with Compression

https://github.com/dpkp/kafka-python/issues/1315 High CPU usage in KafkaConsumer.poll() when subscribed to many topics with no new messages (possibly SSL related)

Kafka-python 客戶端導致的 cpu 使用過高,且無法消費消息的問題