1. 程式人生 > >SparkStreaming消費Kafka數據限速問題

SparkStreaming消費Kafka數據限速問題

使用 font cor 計算 ont 消息 易用 per stream

SparkStreaming消費Kafka數據的時候,當有大量初始化數據時會拖累整個streaming程序的運行,問有什麽辦法?

總體來說這個問題大概有兩種解決思路:

1.在Spark端設置限速;2.在Kafka端設置限速。

Spark端限速的方法知乎上已有很多帖子說過了,主要的思路是設置不同的參數,比如在Direct模式下設spark.streaming.kafka.maxRatePerPartition,receiver模式下設置spark.streaming.receiver.maxRate。它們都是控制每秒處理的消息數。應該說目前使用Direct模式的比較多,因此你需要適當地調整spark.streaming.kafka.maxRatePerPartition值。

在Kafka端設置限速有兩種辦法:

1. 設置broker端參數quota.consumer.default。比如quota.consumer.default=15728640表示將連入該broker的所有consumer的TPS降到15MB/s以下。此參數的好處在於全局生效簡單易用,對broker上所有consumer都是”一視同仁“;缺陷也在於此,無法單獨為個別consumer限速,故該方法在0.11.0.0版本之後已經不推薦使用。

2. 通過kafka-configs命令。比如下面命令是為client.id為clientA的consumer設置限速:

$ bin/kafka-configs.sh --zookeeper localhost:2181

--alter

--add-config ‘consumer_byte_rate=15728640‘

--entity-type clients

--entity-name clientA

此命令只為client.id=clientA的consumer設置了限速,故在Spark端你還需要顯式設置client.id,

比如:

Map<String, Object> kafkaParams = new HashMap<>();

...

kafkaParams.put("client.id", "clientA");

...

JavaInputDStream<ConsumerRecord<String, String>>

stream = KafkaUtils.createDirectStream(...);

值得註意的是,在Kafka端設置的限速單位都是每秒字節數。如果你想按照每秒多少條消息進行限速還需要結合消息的平均大小來計算。

SparkStreaming消費Kafka數據限速問題