1. 程式人生 > >kafka0.8版本和sparkstreaming整合的兩種不同方式

kafka0.8版本和sparkstreaming整合的兩種不同方式

最近研究了不同kafka版本和sparkstreaming整合時的區別,整理如下

1- kafka-0.8.2以上kafka-0.10以下

一種是基於receiver的,一種是沒有receiver的,兩種不同的方式有不同的程式設計模型、效能特徵和語義保證。

1-1 基於receiver的方式

這種方式使用的是kafka的high-level api,通過kafka接收到的訊息會被儲存在spark的executor中,然後啟動一個spark streaming job來處理資料。原始碼分析如下.

1-1-1 重寫Receiver的onStart方法

Initialize the block generator for storing Kafka message.

1-1-1-1 構造BlockGenerator時,會構造一個定時器:

private val blockIntervalTimer = new RecurringTimer(clock, blockIntervalMs, 
updateCurrentBuffer, "BlockGenerator")

這個定時器執行緒會定時得把接收到的訊息構造成一個block,大概流程如下:
1.構造執行緒

private val thread = new Thread("RecurringTimer - " + name)

2.呼叫loop,啟動迴圈

    try {
      while
(!stopped) { triggerActionForNextInterval() }

3.呼叫triggerActionForNextInterval,這個方法裡會呼叫一個高階函式callback:updateCurrentBuffer

4.callback: 把一個buffer轉換成block,用新的空buffer接收資料

  try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) { //如果buffer沒滿,但是定時時間已到,則構造一個新的buffer
//出來用於接收下一批資料,而舊的block則 val newBlockBuffer = currentBuffer currentBuffer = new ArrayBuffer[Any] val blockId = StreamBlockId(receiverId, time - blockIntervalMs) listener.onGenerateBlock(blockId) newBlock = new Block(blockId, newBlockBuffer) } } if (newBlock != null) { blocksForPushing.put(newBlock) // put is blocking when queue is full } blockGenerator = supervisor.createBlockGenerator(new GeneratedBlockHandler)

1-1-1-2 另外一個在blockGenerator中的執行緒是

private val blockPushingThread = new Thread() { 
    override def run() { 
        keepPushingBlocks() 
}

要作用就是呼叫keepPushingBlocks方法,把block push到BlockManager中

1-1-1-3 執行緒池

執行緒池大小就是所有topic執行緒數之和

messageHandlerThreadPool = ThreadUtils.newDaemonFixedThreadPool(
   topics.values.sum, "KafkaMessageHandler")

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(consumerConfig.props)
  .asInstanceOf[Decoder[V]]

1-1-1-4 構造kafka訊息流,返回Map[String,List[KafkaStream[K,V]]]

val topicMessageStreams = consumerConnector.createMessageStreams(topics, keyDecoder, valueDecoder)

方法原型是:

  def createMessageStreams[K,V](topicCountMap: Map[String,Int],
                                keyDecoder: Decoder[K],
                                valueDecoder: Decoder[V])
    : Map[String,List[KafkaStream[K,V]]]

1-1-1-5 提交執行緒池啟動訊息消費

一個list[stream]表示一個topic的多個kafka訊息流,因為是多執行緒消費,所以這裡是一個list

topicMessageStreams.values.foreach { streams =>//遍歷list
  streams.foreach { stream =>//遍歷stream
    messageHandlerThreadPool.submit(new MessageHandler(stream))//為一個消費者的kafka stream啟動一個執行緒來接收訊息,即消費

來看看這個MessageHandler執行緒做些什麼事情。
每接收到一個訊息,呼叫一次storeMessageAndMetadata方法:

private final class MessageHandler(stream: KafkaStream[K, V]) extends Runnable {
  override def run(): Unit = {
    while (!isStopped) {
      try {
        val streamIterator = stream.iterator()
        while (streamIterator.hasNext) {
          storeMessageAndMetadata(streamIterator.next)
        }
      } catch {
        case e: Exception =>
          reportError("Error handling message", e)
      }
    }
  }
}

storeMessageAndMetadata方法:


private def storeMessageAndMetadata(
    msgAndMetadata: MessageAndMetadata[K, V]): Unit = {
  val topicAndPartition = TopicAndPartition(msgAndMetadata.topic, msgAndMetadata.
                                              partition)
  val data = (msgAndMetadata.key, msgAndMetadata.message)
  val metadata = (topicAndPartition, msgAndMetadata.offset)
  blockGenerator.addDataWithCallback(data, metadata)
}

addDataWithCallback方法:

def addDataWithCallback(data: Any, metadata: Any): Unit = {
  if (state == Active) {
    waitToPush()
    synchronized {
      if (state == Active) {
        currentBuffer += data
        listener.onAddData(data, metadata)
      } else {
        throw new SparkException(
          "Cannot add data as BlockGenerator has not been started or has been stopped")
      }
    }
  } else {
    throw new SparkException(
      "Cannot add data as BlockGenerator has not been started or has been stopped")
  }
}

把data,即一條訊息存入currentBuffer這個ArrayBuffer陣列中,裡的onAddData方法,呼叫的是
ReliableKafkaReceiver中的GeneratedBlockHandler的onAddData方法:

def onAddData(data: Any, metadata: Any): Unit = {
  // Update the offset of the data that was added to the generator
  if (metadata != null) {
    val (topicAndPartition, offset) = metadata.asInstanceOf[(TopicAndPartition, Long)]
    updateOffset(topicAndPartition, offset)
  }
}

上邊的引數data和metadata分別是:

val data = (msgAndMetadata.key, msgAndMetadata.message)
val metadata = (topicAndPartition, msgAndMetadata.offset)

而updateOffset方法的作用是把partition和offset存入topicPartitionOffsetMap這個map中,更新每個
partition的最新offset。

整體的呼叫關係如下:

keepPushingBlocks:
     ->pushBlock
          ->GeneratedBlockHandler.onPushBlock
               ->storeBlockAndCommitOffset
                    ->store
                         ->ReceiverSupervisorImpl.pushArrayBuffer
                              ->pushAndReportBlock
                                   ->WriteAheadLogBasedBlockHandler.storeBlock(儲存到blockmanager和wal都成功才算成功)
                                        ->
                                   ->RpcEndpointRef.askWithRetry
                    ->commitOffset(更新每個topic每個partition的offset) //疑問,為什麼這裡需要commit一次offset呢?不是在一個batch處理結束時需要手動commit一次嗎?
                      def consumerOffsetDir = consumerGroupDir + "/offsets/" + topic
                      def consumerOwnerDir = consumerGroupDir + "/owners/" + topic

這種方式在預設配置下有丟失資料分風險,需要配置wal,這種方式把資料還會儲存一份到hdfs上的wal內。程式碼這樣寫:

import org.apache.spark.streaming.kafka._

 val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

主要是per-topic number of Kafka partitions to consume不好理解,看看spark工程中workcount的例子:

val Array(zkQuorum, group, topics, numThreads) = args
val sparkConf = new SparkConf().setAppName("KafkaWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(2))
ssc.checkpoint("checkpoint")

//可以看出,這裡傳入的topic可以是多個,例如:topic1,topic2 16,表示有兩個消費兩個topic的訊息,每個topic啟動16個執行緒接收資料
val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
//這裡的topicMap是:("topic1"->16, "topic2"->16)
val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)

特別說明:
- kafka的topic的partition和rdd的partitin並不對應,所以增加topic的partition不會增加資料處
理並行度
- 可以用不同的消費者組和topic建立多個不同的kafka input DStreams,這樣可以使用多個接收器並行的接收資料
- 如果開啟了wal,並把wal儲存在HDFS上,接收到的資料就會在log中複製一次。因此,這樣的input stream的儲存level就是StorageLevel.MEMORY_AND_DISK_SER

1-2 direct方式(無接收器)

這種方式在spark1.3中被引入,用來保證端到端的一致性。這種方式週期性的使用每個topic+partition
的最新的offset去kafka中獲取資料,相應的也就定義了每個batch要處理的offert範圍。每個處理資料的
job啟動之後,sparkstreaming使用kafka的simple consumer api直接讀取定義好的offset範圍內的訊息,
就像從檔案系統讀資料一樣(seek to offset and read)。所謂kafka的simple api,有以下三個主要特點:
- Read a message multiple times(重複讀取)
- Consume only a subset of the partitions in a topic in a process(跳讀)
- Manage transactions to make sure a message is processed once and only once(Exactly Once原語)

但是這種api也是非常複雜的:
- You must keep track of the offsets in your application to know where you left off consuming.(Offset自己管理)
- You must figure out which Broker is the lead Broker for a topic and partition
- You must handle Broker leader changes

這種directstream的方式有以下的優點:
- 並行度得到簡化。不需要在建立多個kafka stream然後合併他們。有了directstream,sparkstreaming
可以建立和kafka的partition一樣多的rdd的partition,這樣可以並行的讀取kafka的資料。rdd的partition和kafka的partition是一一對應的,這樣容易理解也容易調優。
- 更高效。第一種方式要求資料寫入wal,然後再複製一次資料,所以一共兩次複製:一次是從kafka,
第二次是wal。direct方式沒有這種問題。
- exactly-once語義。第一種方式使用kafka的high level api來把offset儲存到zk,這是常見的消費
kafka資料的方式。但是這種方式只能做到資料零丟失,即at-least once語義,即有些資料可能被消費兩次。這種情況會發生,是因為sparkstreaming跟蹤的offset和zk儲存的offset的不一致導致的。因此,在direct方式中,使用simple kafka api(不依賴zk),sparkstreaming跟蹤的offset儲存在checkpoint中。這就消除了上述的offset的不一致性。為了達到這種exactly-once語義,把結果儲存到外部儲存裝置的output操作必須是冪等的,或者是原子的方式儲存結果和offsets(檢視 Semantics of output operations 可以獲取更多資訊)
- 缺點。這種方式唯一的缺點就是不在zk中更新offset,因此那些基於zk的kafka監控工具就秀不出消費進度了。因此,可以用下邊的方法手動的在每個batch後更新offset到zk

direct方式程式設計方法:

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

也開始傳遞一個messageHandler給createDirectStream方法,這樣就可以訪問MessageAndMetadata物件,這裡邊包含了當前訊息的元資料,進而可以把訊息轉換成任何想要的型別。See the API docs and the example.

對於kafka parameters,要麼要指定metadata.broker.list,要麼要指定bootstrap.servers。預設情況下,從每個kafka partition的最新offset開始消費。如果在配置中指定了auto.offset.reset為smallest,就會從最小的offet開始消費,有碼為證:

方法說明:

   * Points to note:
   *  - No receivers: This stream does not use any receiver. It directly queries Kafka
   *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked
   *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.
   *    You can access the offsets used in each batch from the generated RDDs (see
   *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).
   *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the `StreamingContext`. The information on consumed offset can be
   *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).
   *  - End-to-end semantics: This stream ensures that every records is effectively received and
   *    transformed exactly once, but gives no guarantees on whether the transformed data are
   *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure
   *    that the output operation is idempotent, or use transactions to output records atomically.
   *    See the programming guide for more details.

createDirectStream方法有很多過載方法,下邊這個方法是最終的入口方法。

  def createDirectStream[
    K: ClassTag,
    V: ClassTag,
    KD <: Decoder[K]: ClassTag,
    VD <: Decoder[V]: ClassTag] (
      ssc: StreamingContext,
      kafkaParams: Map[String, String],
      topics: Set[String]
  ): InputDStream[(K, V)] = {
    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)
    val kc = new KafkaCluster(kafkaParams)
    val fromOffsets = **getFromOffsets**(kc, kafkaParams, topics) //如果沒有指定在呼叫的時候指定fromOffsets,那麼會呼叫getFromOffsets方法,計算offset
    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](
      ssc, kafkaParams, fromOffsets, messageHandler)
  }
  private[kafka] def getFromOffsets(
      kc: KafkaCluster,
      kafkaParams: Map[String, String],
      topics: Set[String]
    ): Map[TopicAndPartition, Long] = {
    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)
    val result = for {
      topicPartitions <- kc.getPartitions(topics).right
      leaderOffsets <- (if (reset == Some("smallest")) {//如果auto.offset.reset為smallest,就從這個topic-partition的最小offset開始消費
        kc.getEarliestLeaderOffsets(topicPartitions)
      } else {//否則,從最新的offsets開始消費
        kc.getLatestLeaderOffsets(topicPartitions)
      }).right
    } yield {
      leaderOffsets.map { case (tp, lo) =>
          (tp, lo.offset)
      }
    }
    KafkaCluster.checkErrors(result)
  }

也可以從任意的offset開始消費,這就要使用KafkaUtils.createDirectStream的變種。如果想要訪問每個batch處理的offset,可以這樣做:

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

來看DirectKafkaInputDStream的實現,最主要就是compute方法,因為直接就可以通過讀取kafka的資料來構造rdd了。

  override def compute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {
        val untilOffsets = clamp(latestLeaderOffsets(maxRetries))//根據配置的最大速率對sparkstreaming進行限流,其實就是計算結束offset
    val rdd = KafkaRDD[K, V, U, T, R](
      context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.
    val offsetRanges = currentOffsets.map { case (tp, fo) =>
      val uo = untilOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }
    val description = offsetRanges.filter { offsetRange =>
      // Don't display empty ranges.
      offsetRange.fromOffset != offsetRange.untilOffset
    }.map { offsetRange =>
      s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
        s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
    }.mkString("\n")
    // Copy offsetRanges to immutable.List to prevent from being modified by the user
    val metadata = Map(
      "offsets" -> offsetRanges.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
    val inputInfo = StreamInputInfo(id, rdd.count, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)
    Some(rdd)
  }

同樣,也可以手動更行zk上的offset。

Note that the typecast to HasOffsetRanges will only succeed if it is done in the first method called on the directKafkaStream, not later down a chain of methods. You can use transform() instead of foreachRDD() as your first method call in order to access offsets, then call further Spark methods. However, be aware that the one-to-one mapping between RDD partition and Kafka partition does not remain after any methods that shuffle or repartition, e.g. reduceByKey() or window().

Another thing to note is that since this approach does not use Receivers, the standard receiver-related (that is, configurations of the form spark.streaming.receiver.* ) will not apply to the input DStreams created by this approach (will apply to other input DStreams though). Instead, use the configurations spark.streaming.kafka.*. An important one is spark.streaming.kafka.maxRatePerPartition which is the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API.

現在著重分析一下上邊提到的getLeaderOffsets方法,不管是從最新的(latest)或者是從最早的(earliest)offset開始消費,都要呼叫這個方法,只不過傳入的引數不同:

  def getLatestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.LatestTime)//最新時間點

  def getEarliestLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition]
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] =
    getLeaderOffsets(topicAndPartitions, OffsetRequest.EarliestTime)//最早時間點

  def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long
    ): Either[Err, Map[TopicAndPartition, LeaderOffset]] = {
    getLeaderOffsets(topicAndPartitions, before, 1).right.map { r =>
      r.map { kv =>
        // mapValues isn't serializable, see SI-7005
        kv._1 -> kv._2.head
      }
    }
  }

可以看到getLeaderOffsets呼叫了另外一個getLeaderOffsets方法,如下:

  def getLeaderOffsets(
      topicAndPartitions: Set[TopicAndPartition],
      before: Long,
      maxNumOffsets: Int
    ): Either[Err, Map[TopicAndPartition, Seq[LeaderOffset]]] = {
    findLeaders(topicAndPartitions).right.flatMap { tpToLeader =>//Find an active Broker and find out which Broker is the leader for your topic and partition
      val leaderToTp: Map[(String, Int), Seq[TopicAndPartition]] = flip(tpToLeader)
      val leaders = leaderToTp.keys
      var result = Map[TopicAndPartition, Seq[LeaderOffset]]()
      val errs = new Err
      withBrokers(leaders, errs) { consumer =>
        val partitionsToGetOffsets: Seq[TopicAndPartition] =
          leaderToTp((consumer.host, consumer.port))
        val reqMap = partitionsToGetOffsets.map { tp: TopicAndPartition =>
          tp -> PartitionOffsetRequestInfo(before, maxNumOffsets)
        }.toMap
        val req = OffsetRequest(reqMap)
        val resp = consumer.getOffsetsBefore(req)
        val respMap = resp.partitionErrorAndOffsets
        partitionsToGetOffsets.foreach { tp: TopicAndPartition =>
          respMap.get(tp).foreach { por: PartitionOffsetsResponse =>
            if (por.error == ErrorMapping.NoError) {
              if (por.offsets.nonEmpty) {
                result += tp -> por.offsets.map { off =>
                  LeaderOffset(consumer.host, consumer.port, off)
                }
              } else {
                errs += new SparkException(
                  s"Empty offsets for ${tp}, is ${before} before log beginning?")
              }
            } else {
              errs += ErrorMapping.exceptionFor(por.error)
            }
          }
        }
        if (result.keys.size == topicAndPartitions.size) {
          return Right(result)
        }
      }
      val missing = topicAndPartitions.diff(result.keySet)
      errs += new SparkException(s"Couldn't find leader offsets for ${missing}")
      Left(errs)
    }
  }

來看看compute方法中的clamp方法,clamp的中文意思是鉗制、限制:

  // limits the maximum number of messages per partition
  protected def clamp(
    leaderOffsets: Map[TopicAndPartition, LeaderOffset]): Map[TopicAndPartition, LeaderOffset] = {
    val offsets = leaderOffsets.mapValues(lo => lo.offset)

    maxMessagesPerPartition(offsets).map { mmp => //呼叫maxMessagesPerPartition方法計算每個partition消費的最大訊息條數
      mmp.map { case (tp, messages) =>
        val lo = leaderOffsets(tp)
        tp -> lo.copy(offset = Math.min(currentOffsets(tp) + messages, lo.offset))
      }
    }.getOrElse(leaderOffsets)
  }

maxMessagesPerPartition方法:

  protected[streaming] def maxMessagesPerPartition(
      offsets: Map[TopicAndPartition, Long]): Option[Map[TopicAndPartition, Long]] = {
    val estimatedRateLimit = rateController.map(_.getLatestRate())

    // calculate a per-partition rate limit based on current lag
    val effectiveRateLimitPerPartition = estimatedRateLimit.filter(_ > 0) match {
      case Some(rate) =>
        val lagPerPartition = offsets.map { case (tp, offset) =>
          tp -> Math.max(offset - currentOffsets(tp), 0)
        }
        val totalLag = lagPerPartition.values.sum

        lagPerPartition.map { case (tp, lag) =>
          val backpressureRate = Math.round(lag / totalLag.toFloat * rate)
          tp -> (if (maxRateLimitPerPartition > 0) {
            Math.min(backpressureRate, maxRateLimitPerPartition)} else backpressureRate)
        }
      case None => offsets.map { case (tp, offset) => tp -> maxRateLimitPerPartition }
    }

    if (effectiveRateLimitPerPartition.values.sum > 0) {
      val secsPerBatch = context.graph.batchDuration.milliseconds.toDouble / 1000//batch大小成一limit,就是這個partition能消費的最大訊息數,然後再加上起始offset,就是結束offset
      Some(effectiveRateLimitPerPartition.map {
        case (tp, limit) => tp -> (secsPerBatch * limit).toLong
      })
    } else {
      None
    }
  }

計算出每個partition能消費訊息的起始offset和結束offset後,就可以構造KafkaRDD了,這是一個伴生物件和伴生類,先看伴生物件的apply方法:

  def apply[
    K: ClassTag,
    V: ClassTag,
    U <: Decoder[_]: ClassTag,
    T <: Decoder[_]: ClassTag,
    R: ClassTag](
      sc: SparkContext,
      kafkaParams: Map[String, String],
      fromOffsets: Map[TopicAndPartition, Long],
      untilOffsets: Map[TopicAndPartition, LeaderOffset],
      messageHandler: MessageAndMetadata[K, V] => R
    ): KafkaRDD[K, V, U, T, R] = {
    val leaders = untilOffsets.map { case (tp, lo) =>
        tp -> (lo.host, lo.port)
    }.toMap

    //使用fromoffset,即起始offset和untiloffset,即結束offset,構造offsetrange
    val offsetRanges = fromOffsets.map { case (tp, fo) =>
        val uo = untilOffsets(tp)
        OffsetRange(tp.topic, tp.partition, fo, uo.offset)
    }.toArray

    //使用offsetrange構造KafkaRDD
    //注意這裡要區分RDD的compute方法和DStream的compute方法,下面著重分析KafkaRDD的compute方法
    new KafkaRDD[K, V, U, T, R](sc, kafkaParams, offsetRanges, leaders, messageHandler)
  }

一個典型的RDD子類需要繼承以下幾個方法:

  • getPartitions
  • getPreferredLocations
  • compute
    KafkaRDD的compute方法主要返回一個KafkaRDDIterator物件,這個迭代器物件呼叫getNext方法,就會返回一條從kafka叢集讀到的訊息