1. 程式人生 > >【轉】解決Maxwell發送Kafka消息數據傾斜問題

【轉】解決Maxwell發送Kafka消息數據傾斜問題

select color efault rac -s hashcode The pic targe

最近用Maxwell解析MySQL的Binlog,發送到Kafka進行處理,測試的時候發現一個問題,就是Kafka的Offset嚴重傾斜,三個partition,其中一個的offset已經快200萬了,另外兩個offset才不到兩百。
Kafka數據傾斜的問題一般是由於生產者使用的Partition接口實現類對分區處理的問題,一般是對key做hash之後,對分區數取模。當出現數據傾斜時,小量任務耗時遠高於其它任務,從而使得整體耗時過大,未能充分發揮分布式系統的並行計算優勢(參考Apache Kafka 0.10 技術內幕:數據傾斜詳解)。
而使用Maxwell解析MySQL的Binlog發送到Kafka的時候,生產者是Maxwell

,那麽數據傾斜的問題明細就是Maxwell引起的了。

Maxwell官網查文檔(Producers:kafka-partitioning Maxwell’s Daemon)得知,在Maxwell沒有配置的情況下,默認使用數據庫名作為計算分區的key,並使用Java默認的hashcode算法進行計算:

  1. A binlog event‘s partition is determined by the selected hash function and hash string as follows
  2. | HASH_FUNCTION(HASH_STRING) % TOPIC.NUMBER_OF_PARTITIONS
  3. The HASH_FUNCTION is either java‘s hashCode or murmurhash3. The default HASH_FUNCTION
  4. is hashCode. Murmurhash3 may be set with the kafka_partition_hash option.
  5. …………
  6. The HASH_STRING may be (database, table, primary_key, column). The default HASH_STRING
  7. is the database. The partitioning field can be configured using the
  8. producer_partition_by option.

而在很多業務系統中,不同數據庫的活躍度差異是很大的,主體業務的數據庫操作頻繁,產生的Binlog也就很多,而Maxwell默認使用數據庫作為key進行hash,那麽顯而易見,Binglog的操作經常都被分到同一個分區裏面了。

於是我們在Maxwell啟動命令中加入對應參數即可,這裏我選擇了Rowkey作為分區key,同時選用murmurhash3
哈希算法,以獲得更好的效率和分布:

nohup /opt/maxwell-1.11.0/bin/maxwell --user=maxwell --password=*** --host=***
--exclude_dbs=/^(mysql|maxwell|test)/ --producer=kafka --kafka.bootstrap.servers=***
--kafka_partition_hash=murmur3 --producer_partition_by=primary_key >> /root/maxwell.log &

用此命令重新啟動Maxwell之後,觀察Offset的變化,隔一段時間之後,各分區Offset的增量基本一致,問題解決!

Reference:

https://leibnizhu.gitlab.io/2018/01/03/%E8%A7%A3%E5%86%B3Maxwell%E5%8F%91%E9%80%81Kafka%E6%B6%88%E6%81%AF%E6%95%B0%E6%8D%AE%E5%80%BE%E6%96%9C%E9%97%AE%E9%A2%98/index.html 轉發此文 解決Maxwell發送Kafka消息數據傾斜問題

http://ningg.top/apache-kafka-10-best-practice-tips-data-skew-details/ Apache Kafka 0.10 技術內幕:數據傾斜詳解

【轉】解決Maxwell發送Kafka消息數據傾斜問題