40:Spark Streaming中KafkaReceiver內幕實現徹底解密
阿新 • • 發佈:2018-12-31
本期內容:
1. KafkaInputDStream原始碼解密
2. KafkaReceiver原始碼解密
Direct方式,是No Receiver方式,和普通Receiver方式,最大的區別,是元資料的管理方式。
Direct方式是沒有通過zookeeper,由應用自身來管理。
我們來看看Receiver方式。
KafkaUtils.createStream:
/** * Create an input stream that pulls messages from Kafka Brokers. * @param ssc StreamingContext object * @param zkQuorum Zookeeper quorum (hostname:port,hostname:port,..) * @param groupId The group id for this consumer * @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed * in its own thread * @param storageLevel Storage level to use for storing the received objects * (default: StorageLevel.MEMORY_AND_DISK_SER_2) * @return DStream of (Kafka message key, Kafka message value) */ def createStream( ssc: StreamingContext, zkQuorum: String, groupId: String, topics: Map[String, Int], storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2 ): ReceiverInputDStream[(String, String)] = { val kafkaParams = Map[String, String]( "zookeeper.connect" -> zkQuorum, "group.id" -> groupId, "zookeeper.connection.timeout.ms" -> "10000") createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, topics, storageLevel) } 在Receiver的工廠方法,有一些比較重要的引數:- zkQuorum,就是zookeeper的地址,一般是奇數個。資料是儲存在broker中的,所以只是從zookeeper去查詢我們需要的資料在哪裡,由zookeeper來管理offset等元資料的資訊。
- groupId,sparkStreaming在消費kafka的資料時,是分group的,當進行不同業務型別消費時,會很需要。
- topics,表明消費的內容,每個partition有個單獨的執行緒來抓取資料。
- storageLevel,儲存級別,模式是MEMORY_AND_DISK_SER_2,記憶體放的下放在記憶體,否則放磁碟,所以不用擔心記憶體不夠的問題。
KafkaReceiver
根據前面的課程,我們知道InputDStream最終都會建立一個Receiver物件來工作,在這個功能中,就是KakfaReceiver。
在onStart方法中,最為關鍵的就是建立consumerConnector。
KafkaInputDStream.onStart: def onStart() { logInfo("Starting Kafka Consumer Stream with group: " + kafkaParams("group.id")) // Kafka connection properties val props = new Properties() kafkaParams.foreach(param => props.put(param._1, param._2)) val zkConnect = kafkaParams("zookeeper.connect") // Create the connection to the cluster logInfo("Connecting to Zookeeper: " + zkConnect) val consumerConfig = new ConsumerConfig(props) consumerConnector = Consumer.create(consumerConfig) logInfo("Connected to " + zkConnect) val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[K]] val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) .newInstance(consumerConfig.props) .asInstanceOf[Decoder[V]] // Create threads for each topic/message Stream we are listening val topicMessageStreams = consumerConnector.createMessageStreams( topics, keyDecoder, valueDecoder) val executorPool = ThreadUtils.newDaemonFixedThreadPool(topics.values.sum, "KafkaMessageHandler") try { // Start the messages handler for each partition topicMessageStreams.values.foreach { streams => streams.foreach { stream => executorPool.submit(new MessageHandler(stream)) } } } finally { executorPool.shutdown() // Just causes threads to terminate after work is done } }
內部會生成一個zookeeperConsumerConnector,這是一個門面模式,封裝了與zookeeper溝通的細節。在其中,最關鍵的是呼叫了下面三個方法。
也就是,建立zk連線,建立fetcher,並且將zk中的元資料與fetcher進行連線。
然後,是根據consumer連線來獲取stream,consumer獲取資料過程前面已經完整介紹過,這裡就不重複說明。
最後,會跟據監聽的不同的topic,開啟執行緒,每一個執行緒中都放一個MessageHandler。
MessageHandler裡面的功能就是取出資料,然後store給spark。
至此,完成了資料獲取。