kafka的consumerConnector.createMessageStreams 方法原始碼分析
阿新 • • 發佈:2019-02-17
Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
這裡的createMessageStreams呼叫的是子類ZookeeperConsumerConnector 的實現:
def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V] )
: Map[String, List[KafkaStream[K,V]]] = {
if (messageStreamCreated.getAndSet(true))
throw new MessageStreamsExistException(this.getClass.getSimpleName +
" can create message streams at most once",null)
**consume**(topicCountMap, keyDecoder, valueDecoder)
}
consume方法:
def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
: Map[String,List[KafkaStream[K,V]]] = {
debug("entering consume ")
if (topicCountMap == null)
throw new RuntimeException("topicCountMap is null" )
//這個方法下邊分析
val topicCount = TopicCount.**constructTopicCount**(consumerIdString, topicCountMap)
val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic
// make a list of (queue,stream) pairs, one pair for each threadId
val queuesAndStreams = topicThreadIds.values.map(threadIdSet =>
threadIdSet.map(_ => {//每個執行緒一個佇列,用阻塞佇列構造一個KafkaStream
val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
val stream = new KafkaStream[K,V](
queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId)
(queue, stream)
})
).flatten.toList //本來是((queue1,stream1),(queue2,stream1)),((queue3,stream3),(queue4,stream4)),壓平成:
//((queue1,stream1),(queue2,stream1),(queue3,stream3),(queue4,stream4))
val dirs = new ZKGroupDirs(config.groupId)
registerConsumerInZK(dirs, consumerIdString, topicCount)
reinitializeConsumer(topicCount, queuesAndStreams)
loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]]
}
getConsumerThreadIdsPerTopic方法,實際呼叫的是TopicCount的makeConsumerThreadIdsPerTopic方法:
def makeConsumerThreadIdsPerTopic(consumerIdString: String,
topicCountMap: Map[String, Int]) = {
val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
for ((topic, nConsumers) <- topicCountMap) {//迴圈每個topic和給這個topic配置的執行緒數
val consumerSet = new mutable.HashSet[ConsumerThreadId]
assert(nConsumers >= 1)
for (i <- 0 until nConsumers) //為每個執行緒建立一個執行緒id
consumerSet += ConsumerThreadId(consumerIdString, i)
consumerThreadIdsPerTopicMap.put(topic, consumerSet)
}
consumerThreadIdsPerTopicMap
}