1. 程式人生 > >40:Spark Streaming中KafkaReceiver內幕實現徹底解密

40:Spark Streaming中KafkaReceiver內幕實現徹底解密

本期內容:

1. KafkaInputDStream原始碼解密

2. KafkaReceiver原始碼解密

Direct方式,是No Receiver方式,和普通Receiver方式,最大的區別,是元資料的管理方式。
Direct方式是沒有通過zookeeper,由應用自身來管理。

KafkaUtils.createDirectStream:   /**    * Create an input stream that directly pulls messages from Kafka Brokers    * without using any receiver. This stream can guarantee that each message
   * from Kafka is included in transformations exactly once (see points below).    *    * Points to note:    *  - No receivers: This stream does not use any receiver. It directly queries Kafka    *  - Offsets: This does not use Zookeeper to store offsets. The consumed offsets are tracked    *    by the stream itself. For interoperability with Kafka monitoring tools that depend on
   *    Zookeeper, you have to update Kafka/Zookeeper yourself from the streaming application.    *    You can access the offsets used in each batch from the generated RDDs (see    *    [[org.apache.spark.streaming.kafka.HasOffsetRanges]]).    *  - Failure Recovery: To recover from driver failures, you have to enable checkpointing
   *    in the [[StreamingContext]]. The information on consumed offset can be    *    recovered from the checkpoint. See the programming guide for details (constraints, etc.).    *  - End-to-end semantics: This stream ensures that every records is effectively received and    *    transformed exactly once, but gives no guarantees on whether the transformed data are    *    outputted exactly once. For end-to-end exactly-once semantics, you have to either ensure    *    that the output operation is idempotent, or use transactions to output records atomically.    *    See the programming guide for more details.    *    * @param ssc StreamingContext object    *    configuration parameters</a>. Requires "metadata.broker.list" or "bootstrap.servers"    *    to be set with Kafka broker(s) (NOT zookeeper servers) specified in    *    host1:port1,host2:port2 form.    * @param fromOffsets Per-topic/partition Kafka offsets defining the (inclusive)    *    starting point of the stream    * @param messageHandler Function for translating each message and metadata into the desired type    * @tparam K type of Kafka message key    * @tparam V type of Kafka message value    * @tparam KD type of Kafka message key decoder    * @tparam VD type of Kafka message value decoder    * @tparam R type returned by messageHandler    * @return DStream of R    */   def createDirectStream[     K: ClassTag,     V: ClassTag,     KD <: Decoder[K]: ClassTag,     VD <: Decoder[V]: ClassTag,     R: ClassTag] (       ssc: StreamingContext,       kafkaParams: Map[String, String],       fromOffsets: Map[TopicAndPartition, Long],       messageHandler: MessageAndMetadata[K, V] => R   ): InputDStream[R] = {     val cleanedHandler = ssc.sc.clean(messageHandler)     new DirectKafkaInputDStream[K, V, KD, VD, R](       ssc, kafkaParams, fromOffsets, cleanedHandler)   }

我們來看看Receiver方式。

KafkaUtils.createStream:

  /**    * Create an input stream that pulls messages from Kafka Brokers.    * @param ssc       StreamingContext object    * @param zkQuorum  Zookeeper quorum (hostname:port,hostname:port,..)    * @param groupId   The group id for this consumer    * @param topics    Map of (topic_name -> numPartitions) to consume. Each partition is consumed    *                  in its own thread    * @param storageLevel  Storage level to use for storing the received objects    *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)    * @return DStream of (Kafka message key, Kafka message value)    */   def createStream(       ssc: StreamingContext,       zkQuorum: String,       groupId: String,       topics: Map[String, Int],       storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2     ): ReceiverInputDStream[(String, String)] = {     val kafkaParams = Map[String, String](       "zookeeper.connect" -> zkQuorum, "group.id" -> groupId,       "zookeeper.connection.timeout.ms" -> "10000")     createStream[String, String, StringDecoder, StringDecoder](       ssc, kafkaParams, topics, storageLevel)   } 在Receiver的工廠方法,有一些比較重要的引數:
  1. zkQuorum,就是zookeeper的地址,一般是奇數個。資料是儲存在broker中的,所以只是從zookeeper去查詢我們需要的資料在哪裡,由zookeeper來管理offset等元資料的資訊。
  2. groupId,sparkStreaming在消費kafka的資料時,是分group的,當進行不同業務型別消費時,會很需要。
  3. topics,表明消費的內容,每個partition有個單獨的執行緒來抓取資料。
  4. storageLevel,儲存級別,模式是MEMORY_AND_DISK_SER_2,記憶體放的下放在記憶體,否則放磁碟,所以不用擔心記憶體不夠的問題。
我們可以看到,Receiver方式需要傳入zookeeper的地址。

KafkaReceiver

根據前面的課程,我們知道InputDStream最終都會建立一個Receiver物件來工作,在這個功能中,就是KakfaReceiver。

在onStart方法中,最為關鍵的就是建立consumerConnector。

KafkaInputDStream.onStart:  def onStart() {     logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id"))     // Kafka connection properties     val props = new Properties()     kafkaParams.foreach(param => props.put(param._1, param._2))     val zkConnect = kafkaParams("zookeeper.connect")     // Create the connection to the cluster     logInfo("Connecting to Zookeeper: " + zkConnect)     val consumerConfig = new ConsumerConfig(props)     consumerConnector = Consumer.create(consumerConfig)     logInfo("Connected to " + zkConnect)     val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])       .newInstance(consumerConfig.props)       .asInstanceOf[Decoder[K]]     val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])       .newInstance(consumerConfig.props)       .asInstanceOf[Decoder[V]]     // Create threads for each topic/message Stream we are listening     val topicMessageStreams = consumerConnector.createMessageStreams(       topics, keyDecoder, valueDecoder)     val executorPool =       ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler")     try {       // Start the messages handler for each partition       topicMessageStreams.values.foreach { streams =>         streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) }       }     } finally {       executorPool.shutdown() // Just causes threads to terminate after work is done     }   }

內部會生成一個zookeeperConsumerConnector,這是一個門面模式,封裝了與zookeeper溝通的細節。在其中,最關鍵的是呼叫了下面三個方法。
也就是,建立zk連線,建立fetcher,並且將zk中的元資料與fetcher進行連線。

然後,是根據consumer連線來獲取stream,consumer獲取資料過程前面已經完整介紹過,這裡就不重複說明。

最後,會跟據監聽的不同的topic,開啟執行緒,每一個執行緒中都放一個MessageHandler。

MessageHandler裡面的功能就是取出資料,然後store給spark。

至此,完成了資料獲取。