1- kafka-0.8.2以上kafka-0.10以下


1-1 基於receiver的方式

這種方式使用的是kafka的high-level api,通過kafka接收到的訊息會被儲存在spark的executor中,然後啟動一個spark streaming job來處理資料。原始碼分析如下.

1-1-1 重寫Receiver的onStart方法

Initialize the block generator for storing Kafka message.

1-1-1-1 構造BlockGenerator時,會構造一個定時器:

private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, 
updateCurrentBuffer, "BlockGenerator")


private val thread = new Thread("RecurringTimer - " + name)


    try {
(!stopped) { triggerActionForNextInterval() }


4.callback: 把一個buffer轉換成block,用新的空buffer接收資料

  try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) { //如果buffer沒滿,但是定時時間已到,則構造一個新的buffer
//出來用於接收下一批資料,而舊的block則 val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

1-1-1-2 另外一個在blockGenerator中的執行緒是

private val blockPushingThread = new Thread() { 
    override def run() { 

要作用就是呼叫keepPushingBlocks方法,把block push到BlockManager中

1-1-1-3 執行緒池


messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
   topics.values.sum, "KafkaMessageHandler")

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])

1-1-1-4 構造kafka訊息流,返回Map[String,List[KafkaStream[K,V]]]

val topicMessageStreams = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder)


  def createMessageStreams[K,V](topicCountMap: Map[String,Int],
                                keyDecoder: Decoder[K],
                                valueDecoder: Decoder[V])
    : Map[String,List[KafkaStream[K,V]]]

1-1-1-5 提交執行緒池啟動訊息消費


topicMessageStreams.values.foreach { streams =>//遍歷list
  streams.foreach { stream =>//遍歷stream
    messageHandlerThreadPool.submit(new MessageHandler(stream))//為一個消費者的kafka stream啟動一個執行緒來接收訊息,即消費


private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
  override def run(): Unit = {
    while (!isStopped) {
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext) {
      } catch {
        case e: Exception =>
          reportError("Error handling message", e)


private def storeMessageAndMetadata(
    msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
  val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.
  val data = (msgAndMetadata.key, msgAndMetadata.message)
  val metadata = (topicAndPartition, msgAndMetadata.offset)
  blockGenerator.addDataWithCallback(data, metadata)


def addDataWithCallback(data: Any, metadata: Any): Unit = {
  if (state == Active) {
    synchronized {
      if (state == Active) {
        currentBuffer += data
        listener.onAddData(data, metadata)
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
  } else {
    throw new SparkException(
      "Cannot add data as BlockGenerator has not been started or has been stopped")


def onAddData(data: Any, metadata: Any): Unit = {
  // Update the offset of the data that was added to the generator
  if (metadata != null) {
    val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
    updateOffset(topicAndPartition, offset)


val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)



                    ->commitOffset(更新每個topic每個partition的offset) //疑問,為什麼這裡需要commit一次offset呢?不是在一個batch處理結束時需要手動commit一次嗎?
                      def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
                      def consumerOwnerDir = consumerGroupDir + "/owners/" + topic


import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

主要是per-topic number of Kafka partitions to consume不好理解,看看spark工程中workcount的例子:

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))

//可以看出,這裡傳入的topic可以是多個,例如:topic1,topic2 16,表示有兩個消費兩個topic的訊息,每個topic啟動16個執行緒接收資料
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//這裡的topicMap是:("topic1"->16, "topic2"->16)
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

- kafka的topic的partition和rdd的partitin並不對應,所以增加topic的partition不會增加資料處
- 可以用不同的消費者組和topic建立多個不同的kafka input DStreams,這樣可以使用多個接收器並行的接收資料
- 如果開啟了wal,並把wal儲存在HDFS上,接收到的資料就會在log中複製一次。因此,這樣的input stream的儲存level就是StorageLevel.MEMORY_AND_DISK_SER

1-2 direct方式(無接收器)

job啟動之後,sparkstreaming使用kafka的simple consumer api直接讀取定義好的offset範圍內的訊息,
就像從檔案系統讀資料一樣(seek to offset and read)。所謂kafka的simple api,有以下三個主要特點:
- Read a message multiple times(重複讀取)
- Consume only a subset of the partitions in a topic in a process(跳讀)
- Manage transactions to make sure a message is processed once and only once(Exactly Once原語)

- You must keep track of the offsets in your application to know where you left off consuming.(Offset自己管理)
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes

- 並行度得到簡化。不需要在建立多個kafka stream然後合併他們。有了directstream,sparkstreaming
- 更高效。第一種方式要求資料寫入wal,然後再複製一次資料,所以一共兩次複製:一次是從kafka,
- exactly-once語義。第一種方式使用kafka的high level api來把offset儲存到zk,這是常見的消費
kafka資料的方式。但是這種方式只能做到資料零丟失,即at-least once語義,即有些資料可能被消費兩次。這種情況會發生,是因為sparkstreaming跟蹤的offset和zk儲存的offset的不一致導致的。因此,在direct方式中,使用simple kafka api(不依賴zk),sparkstreaming跟蹤的offset儲存在checkpoint中。這就消除了上述的offset的不一致性。為了達到這種exactly-once語義,把結果儲存到外部儲存裝置的output操作必須是冪等的,或者是原子的方式儲存結果和offsets(檢視 Semantics of output operations 可以獲取更多資訊)
- 缺點。這種方式唯一的缺點就是不在zk中更新offset,因此那些基於zk的kafka監控工具就秀不出消費進度了。因此,可以用下邊的方法手動的在每個batch後更新offset到zk


 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

也開始傳遞一個messageHandler給createDirectStream方法,這樣就可以訪問MessageAndMetadata物件,這裡邊包含了當前訊息的元資料,進而可以把訊息轉換成任何想要的型別。See the API docs and the example.

對於kafka parameters,要麼要指定metadata.broker.list,要麼要指定bootstrap.servers。預設情況下,從每個kafka partition的最新offset開始消費。如果在配置中指定了auto.offset.reset為smallest,就會從最小的offet開始消費,有碼為證:


   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the `StreamingContext`. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.


  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = **getFromOffsets**(kc, kafkaParams, topics) //如果沒有指定在呼叫的時候指定fromOffsets,那麼會呼叫getFromOffsets方法,計算offset
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  private[kafka] def getFromOffsets(
      kc: KafkaCluster,
      kafkaParams: Map[String, String],
      topics: Set[String]
    ): Map[TopicAndPartition, Long] = {
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    val result = for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {//如果auto.offset.reset為smallest,就從這個topic-partition的最小offset開始消費
      } else {//否則,從最新的offsets開始消費
    } yield {
      leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)


// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
 }.map {
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")


  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
        val untilOffsets = clamp(latestLeaderOffsets(maxRetries))//根據配置的最大速率對sparkstreaming進行限流,其實就是計算結束offset
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)


Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, configurations of the form spark.streaming.receiver.* ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the configurations spark.streaming.kafka.*. An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.


  def getLatestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)//最新時間點

  def getEarliestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)//最早時間點

  def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
      r.map { kv =>
        // mapValues isn't serializable, see SI-7005
        kv._1 -> kv._2.head


  def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long,
      maxNumOffsets: Int
    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>//Find an active Broker and find out which Broker is the leader for your topic and partition
      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
      val leaders = leaderToTp.keys
      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
      val errs = new Err
      withBrokers(leaders, errs) { consumer =>
        val partitionsToGetOffsets: Seq[TopicAndPartition] =
          leaderToTp((consumer.host, consumer.port))
        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
        val req = OffsetRequest(reqMap)
        val resp = consumer.getOffsetsBefore(req)
        val respMap = resp.partitionErrorAndOffsets
        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
            if (por.error == ErrorMapping.NoError) {
              if (por.offsets.nonEmpty) {
                result += tp -> por.offsets.map { off =>
                  LeaderOffset(consumer.host, consumer.port, off)
              } else {
                errs += new SparkException(
                  s"Empty offsets for ${tp}, is ${before} before log beginning?")
            } else {
              errs += ErrorMapping.exceptionFor(por.error)
        if (result.keys.size == topicAndPartitions.size) {
          return Right(result)
      val missing = topicAndPartitions.diff(result.keySet)
      errs += new SparkException(s"Couldn't find leader offsets for ${missing}")


  // limits the maximum number of messages per partition
  protected def clamp(
    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
    val offsets = leaderOffsets.mapValues(lo => lo.offset)

    maxMessagesPerPartition(offsets).map { mmp => //呼叫maxMessagesPerPartition方法計算每個partition消費的最大訊息條數
      mmp.map { case (tp, messages) =>
        val lo = leaderOffsets(tp)
        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))


  protected[streaming] def maxMessagesPerPartition(
      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate())

    // calculate a per-partition rate limit based on current lag
    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
        val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
        val totalLag = lagPerPartition.values.sum

        lagPerPartition.map { case (tp, lag) =>
          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
          tp -> (if (maxRateLimitPerPartition > 0) {
            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }

    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000//batch大小成一limit,就是這個partition能消費的最大訊息數,然後再加上起始offset,就是結束offset
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
    } else {


  def apply[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,
    R: ClassTag](
      sc: SparkContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      untilOffsets: Map[TopicAndPartition, LeaderOffset],
      messageHandler: MessageAndMetadata[K, V] => R
    ): KafkaRDD[K, V, U, T, R] = {
    val leaders = untilOffsets.map { case (tp, lo) =>
        tp -> (lo.host, lo.port)

    val offsetRanges = fromOffsets.map { case (tp, fo) =>
        val uo = untilOffsets(tp)
        OffsetRange(tp.topic, tp.partition, fo, uo.offset)

    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)


  • getPartitions
  • getPreferredLocations
  • compute