【原始碼追蹤】SparkStreaming 中用 Direct 方式每次從 Kafka 拉取多少條資料(offset取值範圍)
阿新 • • 發佈:2018-11-09
我們知道 SparkStreaming 用 Direct 的方式拉取 Kafka 資料時,是根據 kafka 中的 fromOffsets 和 untilOffsets 來進行獲取資料的,而 fromOffsets 一般都是需要我們自己管理的,而每批次的 untilOffsets 是由 Driver 程式自動幫我們算出來的。
於是產生了一個疑問:untilOffsets 是怎麼算出來的?
接下來就通過檢視原始碼的方式來找出答案~
首先我們寫一個最簡單的 wordcount 程式,程式碼如下:
/**
* Created by Lin_wj1995 on 2018/4/19.
* 來源:https://blog.csdn.net/Lin_wj1995
*/
object DirectKafkaWordCount {
def main(args: Array[String]) {
val Array(brokers, topics) = args
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
val topicsSet = topics.split(",").toSet
val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
//拿到資料
val lines = messages.map(_._2)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
// 啟動
ssc.start()
ssc.awaitTermination()
}
}
我們可以看出, createDirectStream
是獲得資料的關鍵方法的,我們點選進去
def createDirectStream[
K: ClassTag,
V: ClassTag,
KD <: Decoder[K]: ClassTag,
VD <: Decoder[V]: ClassTag] (
ssc: StreamingContext,
kafkaParams: Map[String, String],
topics: Set[String]
): InputDStream[(K, V)] = {
val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
//kafka cluster 連線物件
val kc = new KafkaCluster(kafkaParams)
//讀取資料的開始位置
val fromOffsets = getFromOffsets(kc, kafkaParams, topics)
//該方法返回了一個DirectKafkaInputDStream的物件
new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
ssc, kafkaParams, fromOffsets, messageHandler)
}
ok,重點來了,點選 DirectKafkaInputDStream
,看一下該類內部是如何的,由於該類內部的方法都是重點,所有我把該類重點的屬性和方法有選擇性的貼出來:
建議從下往上讀!~
private[streaming]
class DirectKafkaInputDStream[
K: ClassTag,
V: ClassTag,
U <: Decoder[K]: ClassTag,
T <: Decoder[V]: ClassTag,
R: ClassTag](
ssc_ : StreamingContext,
val kafkaParams: Map[String, String],
val fromOffsets: Map[TopicAndPartition, Long],
messageHandler: MessageAndMetadata[K, V] => R
) extends InputDStream[R](ssc_) with Logging {
/**
* 為了拿到每個分割槽leader上的最新偏移量(預設值為1),Driver發出請求的最大的連續重試次數
* 預設值為1,也就是說最多請求 2 次
*/
val maxRetries = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRetries", 1)
/**
* 通過 receiver tracker 非同步地維持和傳送新的 rate limits 給 receiver
* 注意:如果引數 spark.streaming.backpressure.enabled 沒有設定,那麼返回為None
*/
override protected[streaming] val rateController: Option[RateController] = {
/**
* isBackPressureEnabled方法對應著“spark.streaming.backpressure.enabled”引數
* 引數說明:簡單來講就是自動推測程式的執行情況並控制接收資料的條數,為了防止處理資料的時間大於批次時間而導致的資料堆積
* 預設是沒有開啟的
*/
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new DirectKafkaRateController(id,
RateEstimator.create(ssc.conf, context.graph.batchDuration)))
} else {
None
}
}
//拿到與Kafka叢集的連線
protected val kc = new KafkaCluster(kafkaParams)
//每個partition每次最多獲取多少條資料,預設是0
private val maxRateLimitPerPartition: Int = context.sparkContext.getConf.getInt(
"spark.streaming.kafka.maxRatePerPartition", 0)
/**
* 真實算出每個partition獲取資料的最大條數
*/
protected def maxMessagesPerPartition: Option[Long] = {
val estimatedRateLimit = rateController.map(_.getLatestRate().toInt) //每批都根據rateContoller預估獲取多少條資料
val numPartitions = currentOffsets.keys.size
val effectiveRateLimitPerPartition = estimatedRateLimit
.filter(_ > 0)
.map { limit =>
if (maxRateLimitPerPartition > 0) {
/*
如果 spark.streaming.kafka.maxRatePerPartition 該引數有設定值且大於0
那麼就取 maxRateLimitPerPartition 和 rateController 算出來的值 之間的最小值(為什麼取最小值,因為這樣是最保險的)
*/
Math.min(maxRateLimitPerPartition, (limit / numPartitions))
} else {
/*
如果 spark.streaming.kafka.maxRatePerPartition 該引數沒有設定
那麼就直接用 rateController 算出來的值
*/
limit / numPartitions
}
}.getOrElse(maxRateLimitPerPartition) //如果沒有設定自動推測的話,則返回引數設定的接收速率
if (effectiveRateLimitPerPartition > 0) {
val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000
Some((secsPerBatch * effectiveRateLimitPerPartition).toLong)
} else {
/*
如果沒有設定 spark.streaming.kafka.maxRatePerPartition 引數,則返回None
*/
None
}
}
//拿到每批的起始 offset
protected var currentOffsets = fromOffsets
/**
* 獲取此時此刻topic中每個partition 最大的(最新的)offset
*/
@tailrec
protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
// Either.fold would confuse @tailrec, do it manually
if (o.isLeft) {
val err = o.left.get.toString
if (retries <= 0) {
throw new SparkException(err)
} else {
log.error(err)
Thread.sleep(kc.config.refreshLeaderBackoffMs)
latestLeaderOffsets(retries - 1)//如果獲取失敗,則重試,且重試次數 -1
}
} else {
o.right.get //如果沒有問題,則拿到最新的 offset
}
}
// limits the maximum number of messages per partition
/**
* ★★★★★重要方法,答案就在這裡
* @param leaderOffsets 該引數的offset是當前最新的offset
* @return 包含untilOffsets的資訊
*/
protected def clamp(
leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
maxMessagesPerPartition.map { mmp =>
leaderOffsets.map { case (tp, lo) =>
/**
* 如果有設定自動推測,那麼就將值設定為: min(自動推測出來的offset,此時此刻最新的offset)
*/
tp -> lo.copy(offset = Math.min(currentOffsets(tp) + mmp, lo.offset))
}
}.getOrElse(leaderOffsets) //如果沒有設定自動推測,那麼untilOffsets的值就是最新的offset
}
override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
//====》★★★★★從這裡作為入口盡心檢視
val untilOffsets = clamp(latestLeaderOffsets(maxRetries))
//根據offset去拉取資料,完!
val rdd = KafkaRDD[K, V, U, T, R](
context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)
。。。
答案找到了,下面的就不寫了
。。。。