1. 程式人生 > >關於Spark Streaming感知kafka動態分割槽的問題

關於Spark Streaming感知kafka動態分割槽的問題

本文主要是講解Spark Streaming與kafka結合的新增分割槽檢測的問題。讀本文前關於kafka與Spark Streaming結合問題請參考下面兩篇文章:

讀本文前是需要了解Spark Streaming的原理和原始碼結構基礎。

Spark Streaming原始碼系列視訊教程請點閱讀原文進入浪尖的知識星球:Spark技術學院。

kafka 0.8版本

進入正題,之所以會有今天題目的疑惑,是由於在08版本kafka和Spark Streaming結合的DirectStream這種形式的API裡面,是不支援kafka新增分割槽或者topic檢測的。而這個問題,對於很多業務增長比較明顯的公司都是會有碰到相應的問題。

比如,原來的公司業務增長比較明顯,那麼kafka吞吐量,剛開始建立的topic數目和分割槽數目可能滿足不了併發需求,需要增加分割槽。新增加的分割槽會有生產者往裡面寫資料,而Spark Streaming跟kafka 0.8版本結合的API是滿足不了動態發現kafka新增topic或者分割槽的需求的。

這麼說有什麼依據嗎?我們做專案不能人云亦云,所以我們可以從原始碼入手驗證我們的想法。

我們在這裡不會詳細講Spark Streaming原始碼,但是我們可以在這裡思考一下,Spark Streaming分割槽檢測是在哪做的?

很明顯對於批處理的Spark Streaming任務來說,分割槽檢測應該在每次job生成獲取kafkaRDD,來給kafkaRDD確定分割槽數並且每個分割槽賦值offset範圍的時候有牽扯,而這段程式碼就在

DirectKafkaInputDStream#compute方法中。(看過浪尖Spark Streaming原始碼視訊教程的肯定會知道)

那麼我們就貼出這塊原始碼去驗證我們的想法,首先compute方法的第一行:

val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

這裡面獲取的是當前生成KafkaRDD每個分割槽消費的offset的最大值,那麼我們需要進入latestLeaderOffsets進一步去看,可以發現下面一行程式碼:

val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)

這個是根據currentOffsets資訊來獲取最大的offset,由此此處繼續深入發現,由於它只是根據currentOffsets資訊來獲取最大的offset,沒有去感知新增的分割槽,所以Spark Streaming與kafka 0.8結合是不能動態感知分割槽的。

kafka 0.10版本

相似的我們也可以直接去看kafka 0.10這塊的原始碼去檢查,他是否會動態生成kafka分割槽。

進入DirectKafkaInputDStream的compute,看到的第一行程式碼也是:

val untilOffsets = clamp(latestOffsets())

在latestOffsets裡面,有了新的大陸:

640?wx_fmt=png

到這裡本文就算結束了,kafka 0.10版本與SparkStreaming結合支援新增分割槽檢測,這件事告訴我們沒事要多看原始碼,增長見識。

有收穫就點個贊吧。