1. 程式人生 > >kafka的consumerConnector.createMessageStreams 方法原始碼分析

kafka的consumerConnector.createMessageStreams 方法原始碼分析

Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(
topics, keyDecoder, valueDecoder)
這裡的createMessageStreams呼叫的是子類ZookeeperConsumerConnector 的實現:

  def createMessageStreams[K,V](topicCountMap: Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V]
) : Map[String, List[KafkaStream[K,V]]] = { if (messageStreamCreated.getAndSet(true)) throw new MessageStreamsExistException(this.getClass.getSimpleName + " can create message streams at most once",null) **consume**(topicCountMap, keyDecoder, valueDecoder)
}

consume方法:

  def consume[K, V](topicCountMap: scala.collection.Map[String,Int], keyDecoder: Decoder[K], valueDecoder: Decoder[V])
      : Map[String,List[KafkaStream[K,V]]] = {
    debug("entering consume ")
    if (topicCountMap == null)
      throw new RuntimeException("topicCountMap is null"
) //這個方法下邊分析 val topicCount = TopicCount.**constructTopicCount**(consumerIdString, topicCountMap) val topicThreadIds = topicCount.getConsumerThreadIdsPerTopic // make a list of (queue,stream) pairs, one pair for each threadId val queuesAndStreams = topicThreadIds.values.map(threadIdSet => threadIdSet.map(_ => {//每個執行緒一個佇列,用阻塞佇列構造一個KafkaStream val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList //本來是((queue1,stream1),(queue2,stream1)),((queue3,stream3),(queue4,stream4)),壓平成: //((queue1,stream1),(queue2,stream1),(queue3,stream3),(queue4,stream4)) val dirs = new ZKGroupDirs(config.groupId) registerConsumerInZK(dirs, consumerIdString, topicCount) reinitializeConsumer(topicCount, queuesAndStreams) loadBalancerListener.kafkaMessageAndMetadataStreams.asInstanceOf[Map[String, List[KafkaStream[K,V]]]] }

getConsumerThreadIdsPerTopic方法,實際呼叫的是TopicCount的makeConsumerThreadIdsPerTopic方法:

  def makeConsumerThreadIdsPerTopic(consumerIdString: String,
                                    topicCountMap: Map[String,  Int]) = {
    val consumerThreadIdsPerTopicMap = new mutable.HashMap[String, Set[ConsumerThreadId]]()
    for ((topic, nConsumers) <- topicCountMap) {//迴圈每個topic和給這個topic配置的執行緒數
      val consumerSet = new mutable.HashSet[ConsumerThreadId]
      assert(nConsumers >= 1)
      for (i <- 0 until nConsumers) //為每個執行緒建立一個執行緒id
        consumerSet += ConsumerThreadId(consumerIdString, i) 
      consumerThreadIdsPerTopicMap.put(topic, consumerSet)
    }
    consumerThreadIdsPerTopicMap
  }