spring-sparkstreaming-kafka10整合實現以及可能出現的部分問題(DirectKafkaInputDStream 無法序列化)
本文所研究的spark-streaming程式碼版本為2.3.0-SNAPSHOT
spark-streaming為了匹配0.10以後版本的kafka客戶端變化推出了一個目前還是Experimental狀態的spark-streaming-kafka-0-10客戶端,由於老的0.8版本無法支援kerberos許可權校驗,需要研究下spark-streaming-kafka-0-10的原始碼實現以及系統架構。
首先看下初始化kafkastream的方法宣告,
def createDirectStream[K, V](ssc: StreamingContext,locationStrategy: LocationStrategy,consumerStrategy: ConsumerStrategy[K, V],perPartitionConfig: PerPartitionConfig): InputDStream[ConsumerRecord[K, V]] = {new DirectKafkaInputDStream[K, V](ssc, locationStrategy, consumerStrategy, perPartitionConfig)}
DirectKafkaInputDStream的初始化引數包括StreamingContext,LocationStrategy,ConsumerStrategy和perPartitionConfig,根據原始碼文件locationStrategy一般採用
PreferConsistent
,perPartitionConfig一般採用預設實現,這裡不做研究,主要會有點區別的引數為consumerStrategy,它的作用會在下面的原始碼分析裡展示出來。
一 driver consumer
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils
.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies. <String, String> Subscribe(topics,
kafkaParams));
以上述初始化程式碼為例,首先DirectKafkaInputDStream會呼叫start方法進行初始化,相關程式碼如下
override def start(): Unit = { val c = consumer //初始化driver端consumer paranoidPoll(c) //調整offset位置 if (currentOffsets.isEmpty) { currentOffsets= c.assignment().asScala.map { tp => tp -> c.position(tp) }.toMap } // don't actually want to consume any messages, so pause all partitions c.pause(currentOffsets.keySet.asJava) }
這段程式碼在driver端初始化一個consumer, 該consumer的型別由上面提到的consumerStrategy決定,Subscribe類的實現如下,相當與在driver端啟動一個以subscribe模式訂閱topic的客戶端。在有初始啟動offset傳入的情況下會把consumer的offset遊標seek到對應的地址。
private case class Subscribe[K, V]( topics: ju.Collection[jl.String], kafkaParams: ju.Map[String, Object], offsets: ju.Map[TopicPartition, jl.Long] ) extends ConsumerStrategy[K, V] with Logging { def executorKafkaParams: ju.Map[String, Object] = kafkaParams def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = { val consumer = new KafkaConsumer[K, V](kafkaParams) consumer.subscribe(topics) val toSeek = if (currentOffsets.isEmpty) { offsets } else { currentOffsets } if (!toSeek.isEmpty) { // work around KAFKA-3370 when reset is none // poll will throw if no position, i.e. auto offset reset none and no explicit position // but cant seek to a position before poll, because poll is what gets subscription partitions // So, poll, suppress the first exception, then seek val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG) val shouldSuppress = aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE" try { consumer.poll(0) } catch { case x: NoOffsetForPartitionException if shouldSuppress => logWarning("Catching NoOffsetForPartitionException since " + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none. See KAFKA-3370") } toSeek.asScala.foreach { case (topicPartition, offset) => consumer.seek(topicPartition, offset) } // we've called poll, we must pause or next poll may consume messages and set position consumer.pause(consumer.assignment()) } consumer } }
DirectKafkaInputDStream的另一個核心方法是compute,這個方法的核心作用之一就是不斷地生成對應時間的RDD分配到新的job計算任務,具體實現如下,主要是根據系統設定的限速和現有
kafka topicpartion計算出每一個job分配到的KafkaRDD對應的資料範圍以及提交offset等工作。
override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { val untilOffsets = clamp(latestOffsets()) //根據maxrate和backpressuce等限速配置計算下一批rdd每個裡面kafka訊息的截止offset val offsetRanges = untilOffsets.map { case (tp, uo) => val fo = currentOffsets(tp) OffsetRange(tp.topic, tp.partition, fo, uo) }//初始化offset列表,包括(topic,partition,起始offset,截止offset) val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled", true) val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray, getPreferredHosts, useConsumerCache)//根據計算好的offsetRange和修改後的kafkaParam初始化RDD // Report the record number and metadata of this batch interval to InputInfoTracker. val description = offsetRanges.filter { offsetRange => // Don't display empty ranges. offsetRange.fromOffset != offsetRange.untilOffset }.map { offsetRange => s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" + s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}" }.mkString("\n") // Copy offsetRanges to immutable.List to prevent from being modified by the user val metadata = Map( "offsets" -> offsetRanges.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> description) val inputInfo = StreamInputInfo(id, rdd.count, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) currentOffsets = untilOffsets commitAll() Some(rdd) }
注意上文裡的latestOffset()方法實現如下,通過新的consumerapi的c.seekToEnd(currentOffsets.keySet.asJava)將consumer的offsetapi遊標放到了對應分割槽的最後位置,
如果在初始化的kafkaParams設定"enable.auto.commit"屬性為"true",diver客戶端會自動像kafka傳送最後seek到的offset位置
protected def latestOffsets(): Map[TopicPartition, Long] = { val c = consumer paranoidPoll(c) val parts = c.assignment().asScala // make sure new partitions are reflected in currentOffsets val newPartitions = parts.diff(currentOffsets.keySet) // position for new partitions determined by auto.offset.reset if no commit currentOffsets = currentOffsets ++ newPartitions.map(tp => tp -> c.position(tp)).toMap // don't want to consume messages, so pause c.pause(newPartitions.asJava) // find latest available offsets c.seekToEnd(currentOffsets.keySet.asJava) parts.map(tp => tp -> c.position(tp)).toMap }
二 executor consumer
executor consumer的初始化過程位於KafkaRDD內部,在程式初始的kafaparams基礎上呼叫了fixKfkaParams方法對引數進行了部分調整和改寫,包括groupid,enable.auto.commit,auto.offset.config等屬性。
private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = { logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor") kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean) logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor") kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none") // driver and executor should be in different consumer groups val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG) if (null == originalGroupId) { logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it") } val groupId = "spark-executor-" + originalGroupId logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}") kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId) // possible workaround for KAFKA-3135 val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG) if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) { logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135") kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer) } }
KafkaRDD內部與consumer相關的幾個方法如下:首先通過getPartitions方法將對應的topic分割槽與RDD的每一個分割槽對應起來,然後通過compute方法初始化KafkaRDDIterator,每個KafkaRDDIterator通過CachedKafkaConsumer介面拿到一個CachedKafkaConsumer引用並在next()方法裡不斷返回ConsumerRecord值。
override def getPartitions: Array[Partition] = { offsetRanges.zipWithIndex.map { case (o, i) => new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset) }.toArray } override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = { val part = thePart.asInstanceOf[KafkaRDDPartition] assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { new KafkaRDDIterator(part, context) } } private class KafkaRDDIterator( part: KafkaRDDPartition, context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + s"offsets ${part.fromOffset} -> ${part.untilOffset}") val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] context.addTaskCompletionListener{ context => closeIfNeeded() } val consumer = if (useConsumerCache) { CachedKafkaConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor) if (context.attemptNumber >= 1) { // just in case the prior attempt failures were cache related CachedKafkaConsumer.remove(groupId, part.topic, part.partition) } CachedKafkaConsumer.get[K, V](groupId, part.topic, part.partition, kafkaParams) } else { CachedKafkaConsumer.getUncached[K, V](groupId, part.topic, part.partition, kafkaParams) } var requestOffset = part.fromOffset def closeIfNeeded(): Unit = { if (!useConsumerCache && consumer != null) { consumer.close } } override def hasNext(): Boolean = requestOffset < part.untilOffset override def next(): ConsumerRecord[K, V] = { assert(hasNext(), "Can't call getNext() once untilOffset has been reached") val r = consumer.get(requestOffset, pollTimeout) requestOffset += 1 r } }
根據是否使用consumer的快取池特性(這個屬性由spark.streaming.kafka.consumer.cache.enabled決定),CachedKafkaConsumer提供了兩種靜態方法獲取consumer客戶端,get()和getUncached()。
get方法從CachedKafkaConsumer的靜態linkhashmap屬性cache中存取已經初始化好的CachedKafkaConsumer物件,相當於每個executor內部維護了一個consumer的連線池。
getUncached相當於每次拉新資料都初始化一個consumer連線,並在這個RDD任務結束後關掉consumer例項。
CachedKafkaConsumer初始化kafka consumer客戶端的相關程式碼如下,可以看到真正拉資料的executor客戶端是採用了assgin方式訂閱到單個分割槽初始化完成的。
protected val consumer = { val c = new KafkaConsumer[K, V](kafkaParams) val tps = new ju.ArrayList[TopicPartition]() tps.add(topicPartition) c.assign(tps) c }
三 offset提交
除了上文提到的將driver端的auto.commit屬性開啟提交offset的方式以外,sparkstreaming還在DirectKafkaInputDStream中提供了一個commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback)方法允許手動觸發offset提交,這個方法將需要提交的offset列表放到了一個commitQueue裡面,然後在每次呼叫compute方法的時候最後的commitall方法通過driver端的consumer把offset提交到kafka上。
def commitAsync(offsetRanges: Array[OffsetRange], callback: OffsetCommitCallback): Unit = { commitCallback.set(callback) commitQueue.addAll(ju.Arrays.asList(offsetRanges: _*)) } protected def commitAll(): Unit = { val m = new ju.HashMap[TopicPartition, OffsetAndMetadata]() var osr = commitQueue.poll() while (null != osr) { val tp = osr.topicPartition val x = m.get(tp) val offset = if (null == x) { osr.untilOffset } else { Math.max(x.offset, osr.untilOffset) } m.put(tp, new OffsetAndMetadata(offset)) osr = commitQueue.poll() } if (!m.isEmpty) { consumer.commitAsync(m, commitCallback.get) } }
stream.foreachRDD(rdd -> { OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges(); // some time later, after outputs have completed ((CanCommitOffsets) stream.inputDStream()).commitAsync(offsetRanges); });
注意:如果是採用官方文件裡上述方式手動提交offset,需要把stream物件的屬性標記為static或者transient避免序列化,不然可能在任務提交的時候報DirectKafkaInputDStream 無法序列化導致Task not serializable錯誤
結論
新的spark-streaming-kafka-0-10客戶端採用了與原有版本完全不同的架構,一個job裡面運行了兩組consumer:driver consumer和 executor consumer,driver端consumer負責分配和提交offset到初始化好的KafkaRDD當中去,KafkaRDD內部會根據分配到的每個topic的每個partition初始化一個CachedKafkaConsumer客戶端通過assgin的方式訂閱到topic拉取資料。