1. 程式人生 > >深入學習Kafka:Topic的刪除過程分析

深入學習Kafka:Topic的刪除過程分析

要刪除Topic,需要執行下面命令:

.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

這裡假設zookeeper地址為localhost,要刪除的topic是test,這條命令實際上是在zookeeper的節點/admin/delete_topics下建立一個節點test,節點名為topic名字。(很多博文中說這個節點時臨時的,其實不是,是個持久節點,直到topic真正刪除時,才會被controller刪除)
執行這段命令後控制檯輸出
Topic test is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
也就是說執行刪除命令,不是真正刪除,而是標記刪除,在zookeeper上新增/admin/delete_topics/test節點,也提醒了我們,需要提前開啟delete.topic.enable開關。

Kafka刪除Topic的原始碼分析

在Kafka中,Topic的刪除是靠DeleteTopicManager類來完成的。
當Broker被選舉成叢集Leader之後,KafkaController中的onControllerFailover會被呼叫,在該方法中會呼叫deleteTopicManager.start()來啟動刪除Topic的執行緒。
而當Broker不再成為叢集Leader時,KafkaController中的onControllerResignation會被呼叫,在該方法中會呼叫deleteTopicManager.shutdown()來關閉刪除Topic的執行緒。

在KafkaController的onControllerFailover方法中,初始化了partitionStateMachine狀態機,並註冊了相應的事件監聽器,主要是監聽zookeeper節點/admin/delete_topics下子節點的變化。

  def onControllerFailover() {
    if(isRunning) {
      // ...
      partitionStateMachine.registerListeners()
      replicaStateMachine.registerListeners()
      // ...
      deleteTopicManager.start()
    }
    else
info("Controller has been shut down, aborting startup/failover") }
class PartitionStateMachine{
  def registerListeners() {
    registerTopicChangeListener()
    if(controller.config.deleteTopicEnable)
      //註冊事件監聽,關注節點/admin/delete_topics下子節點的變化
      registerDeleteTopicListener()
  }

  private def registerDeleteTopicListener() = {
    zkUtils.zkClient.subscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
  }

  private def deregisterDeleteTopicListener() = {
    zkUtils.zkClient.unsubscribeChildChanges(DeleteTopicsPath, deleteTopicsListener)
  }
}

kafka.controller.PartitionStateMachine.DeleteTopicsListener

DeleteTopicsListener將監聽zookeeper節點/admin/delete_topics下子節點的變化,當有childChange,即有新的topic需要被刪除時,該handleChildChange會被觸發,將該topic加入到deleteTopicManager的queue中

  class DeleteTopicsListener() extends IZkChildListener with Logging {
    this.logIdent = "[DeleteTopicsListener on " + controller.config.brokerId + "]: "
    val zkUtils = controllerContext.zkUtils

    /**
     * Invoked when a topic is being deleted
     * @throws Exception On any error.
     */
    @throws(classOf[Exception])
    def handleChildChange(parentPath : String, children : java.util.List[String]) {
      //監聽zookeeper節點/admin/delete_topics下子節點的變化,當有childChange,即有新的topic需要被刪除時,該handleChildChange會被觸發
      inLock(controllerContext.controllerLock) {
        var topicsToBeDeleted = {
          import JavaConversions._
          (children: Buffer[String]).toSet
        }
        debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
        //查詢Topic是否存在,若topic已經不存在了,則直接刪除/admin/delete_topics/< topic_name >節點
        val nonExistentTopics = topicsToBeDeleted.filter(t => !controllerContext.allTopics.contains(t))
        if(nonExistentTopics.size > 0) {
          warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
          nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
        }
        topicsToBeDeleted --= nonExistentTopics
        if(topicsToBeDeleted.size > 0) {
          info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
          // mark topic ineligible for deletion if other state changes are in progress
          // 查詢topic是否為當前正在執行Preferred副本選舉或分割槽重分配,若果是,則標記為暫時不適合被刪除。
          topicsToBeDeleted.foreach { topic =>
            val preferredReplicaElectionInProgress =
              controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
            val partitionReassignmentInProgress =
              controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
            if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
              controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
          }
          // add topic to deletion list
          //新增topic到待刪除的queue中
          controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
        }
      }
    }

    /**
     *
     * @throws Exception
   *             On any error.
     */
    @throws(classOf[Exception])
    def handleDataDeleted(dataPath: String) {
    }
  }

TopicDeletionManager

class TopicDeletionManager(controller: KafkaController,
                           initialTopicsToBeDeleted: Set[String] = Set.empty,
                           initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {
  this.logIdent = "[Topic Deletion Manager " + controller.config.brokerId + "], "
  val controllerContext = controller.controllerContext
  //partition狀態機
  val partitionStateMachine = controller.partitionStateMachine
  //replica狀態機
  val replicaStateMachine = controller.replicaStateMachine
  val topicsToBeDeleted: mutable.Set[String] = mutable.Set.empty[String] ++ initialTopicsToBeDeleted
  val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)
  val deleteLock = new ReentrantLock()
  val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++
    (initialTopicsIneligibleForDeletion & initialTopicsToBeDeleted)
  val deleteTopicsCond = deleteLock.newCondition()
  val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
  var deleteTopicsThread: DeleteTopicsThread = null
  val isDeleteTopicEnabled = controller.config.deleteTopicEnable

  /**
   * Invoked at the end of new controller initiation
   */
  def start() {
    if (isDeleteTopicEnabled) {
      //如果topic.delete.enable=true,則啟動刪除執行緒
      deleteTopicsThread = new DeleteTopicsThread()
      if (topicsToBeDeleted.size > 0)
        deleteTopicStateChanged.set(true)
      deleteTopicsThread.start()
    }
  }

  /**
   * Invoked when the current controller resigns. At this time, all state for topic deletion should be cleared.
   */
  def shutdown() {
    // Only allow one shutdown to go through
    if (isDeleteTopicEnabled && deleteTopicsThread.initiateShutdown()) {
      // Resume the topic deletion so it doesn't block on the condition
      //此時刪除執行緒有可能處於等待狀態,即awaitTopicDeletionNotification方法處於阻塞等待狀態,則喚醒該刪除執行緒
      resumeTopicDeletionThread()
      // Await delete topic thread to exit
      //等待刪除執行緒doWork執行結束
      deleteTopicsThread.awaitShutdown()
      //清除資源
      topicsToBeDeleted.clear()
      partitionsToBeDeleted.clear()
      topicsIneligibleForDeletion.clear()
    }
  }
}

DeleteTopicsThread

DeleteTopicsThread繼承自ShutdownableThread,ShutdownableThread是一個可以迴圈執行某個方法(doWork方法)的執行緒,也提供了shutdown方法同步等待該執行緒真正執行結束,程式碼比較簡單。利用了CountDownLatch來阻塞呼叫shutdown的執行緒,待doWork真正執行結束時,再喚醒其他阻塞的執行緒。

ShutdownableThread

abstract class ShutdownableThread(val name: String, val isInterruptible: Boolean = true)
        extends Thread(name) with Logging {
  this.setDaemon(false)
  this.logIdent = "[" + name + "], "
  val isRunning: AtomicBoolean = new AtomicBoolean(true)
  private val shutdownLatch = new CountDownLatch(1)

  def shutdown() = {
    //設定running狀態為false
    initiateShutdown()
    //等待在執行的任務執行完畢
    awaitShutdown()
  }

  def initiateShutdown(): Boolean = {
    if(isRunning.compareAndSet(true, false)) {
      info("Shutting down")
      isRunning.set(false)
      if (isInterruptible)
        interrupt()
      true
    } else
      false
  }

  /**
   * After calling initiateShutdown(), use this API to wait until the shutdown is complete
   */
  def awaitShutdown(): Unit = {
    //等待執行緒執行結束
    shutdownLatch.await()
    info("Shutdown completed")
  }

  /**
   * This method is repeatedly invoked until the thread shuts down or this method throws an exception
   */
  def doWork(): Unit

  override def run(): Unit = {
    info("Starting ")
    try{
      while(isRunning.get()){
        doWork()
      }
    } catch{
      case e: Throwable =>
        if(isRunning.get())
          error("Error due to ", e)
    }
    //計數器減一,喚醒在awaitShutdown方法上等待的執行緒
    shutdownLatch.countDown()
    info("Stopped ")
  }
}

DeleteTopicsThread

當刪除Topic的事件通知到來,則doWork裡方法繼續往下執行:
當所有的replica都完成了topic的刪除動作,則呼叫completeDeleteTopic做最後的清理動作,包括zookeeper上節點的刪除,以及controller記憶體中的清理。
如果有replica將該topic標記為不可刪除(可能之前是由於該replica處於Preferred副本選舉或分割槽重分配的過程中),如果有,則重試將topic標記成刪除狀態
如果該topic可以被刪除,且還沒有處於已經開始刪除的狀態,則呼叫onTopicDeletion執行真正的刪除邏輯

  class DeleteTopicsThread() extends ShutdownableThread(name = "delete-topics-thread-" + controller.config.brokerId, isInterruptible = false) {
    val zkUtils = controllerContext.zkUtils
    override def doWork() {
      //等待刪除Topic的事件通知
      awaitTopicDeletionNotification()

      if (!isRunning.get)
        return

      inLock(controllerContext.controllerLock) {
        val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted

        if(!topicsQueuedForDeletion.isEmpty)
          info("Handling deletion for topics " + topicsQueuedForDeletion.mkString(","))

        topicsQueuedForDeletion.foreach { topic =>
          // if all replicas are marked as deleted successfully, then topic deletion is done
          //如果所有的replica都完成了topic的刪除
          if(controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
            // clear up all state for this topic from controller cache and zookeeper
            completeDeleteTopic(topic)
            info("Deletion of topic %s successfully completed".format(topic))
          } else {
            //至少一個replica在開始刪除狀態
            if(controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
              // ignore since topic deletion is in progress
              val replicasInDeletionStartedState = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionStarted)
              val replicaIds = replicasInDeletionStartedState.map(_.replica)
              val partitions = replicasInDeletionStartedState.map(r => TopicAndPartition(r.topic, r.partition))
              info("Deletion for replicas %s for partition %s of topic %s in progress".format(replicaIds.mkString(","),
                partitions.mkString(","), topic))
            } else {
              // if you come here, then no replica is in TopicDeletionStarted and all replicas are not in
              // TopicDeletionSuccessful. That means, that either given topic haven't initiated deletion
              // or there is at least one failed replica (which means topic deletion should be retried).
              //如果沒有replica處於開始刪除狀態(TopicDeletionStarted),並且也不是所有replica都刪除了該topic
              //則判斷是否有replica將該topic標記為不可刪除,如果有,則重試將topic標記成刪除狀態
              if(controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
                // mark topic for deletion retry
                markTopicForDeletionRetry(topic)
              }
            }
          }
          // Try delete topic if it is eligible for deletion.
          //如果該topic可以被刪除,且還沒有處於已經開始刪除的狀態
          if(isTopicEligibleForDeletion(topic)) {
            info("Deletion of topic %s (re)started".format(topic))
            // topic deletion will be kicked off
            //觸發topic刪除事件
            onTopicDeletion(Set(topic))
          } else if(isTopicIneligibleForDeletion(topic)) {
            info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
          }
        }
      }
    }
  }

completeDeleteTopic方法

完成刪除topic後會呼叫completeDeleteTopic進行一些清理工作,即:
刪除zookeeper上節點/brokers/topics/< topic_name >
刪除zookeeper上節點/config/topics/< topic_name >
刪除zookeeper上節點/admin/delete_topics/< topic_name >
並刪除記憶體中的topic相關資訊。

  private def completeDeleteTopic(topic: String) {
    // deregister partition change listener on the deleted topic. This is to prevent the partition change listener
    // firing before the new topic listener when a deleted topic gets auto created
    partitionStateMachine.deregisterPartitionChangeListener(topic)
    val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
    // controller will remove this replica from the state machine as well as its partition assignment cache
    replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)
    val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
    // move respective partition to OfflinePartition and NonExistentPartition state
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
    partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)
    topicsToBeDeleted -= topic
    partitionsToBeDeleted.retain(_.topic != topic)
    val zkUtils = controllerContext.zkUtils
    //刪除zookeeper上節點/brokers/topics/< topic_name >
    zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
    //刪除zookeeper上節點/config/topics/< topic_name >
    zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
    //刪除zookeeper上節點/admin/delete_topics/< topic_name >
    zkUtils.zkClient.delete(getDeleteTopicPath(topic))
    //最後移除記憶體中的topic相關資訊
    controllerContext.removeTopic(topic)
  }

markTopicForDeletionRetry方法

將topic標記成OfflineReplica狀態來重試刪除

  private def markTopicForDeletionRetry(topic: String) {
    // reset replica states from ReplicaDeletionIneligible to OfflineReplica
    val failedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionIneligible)
    info("Retrying delete topic for topic %s since replicas %s were not successfully deleted"
      .format(topic, failedReplicas.mkString(",")))
    controller.replicaStateMachine.handleStateChanges(failedReplicas, OfflineReplica)
  }

onTopicDeletion方法

onTopicDeletion最終會呼叫startReplicaDeletion方法,來開始刪除這個topic的所有分割槽

  private def onTopicDeletion(topics: Set[String]) {
    info("Topic deletion callback for %s".format(topics.mkString(",")))
    // send update metadata so that brokers stop serving data for topics to be deleted
    val partitions = topics.flatMap(controllerContext.partitionsForTopic)
    // 向各broker更新原資訊,使得他們不再向外提供資料服務,準備開始刪除資料
    controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
    val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
    topics.foreach { topic =>
      onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).map(_._1).toSet)
    }
  }

  private def onPartitionDeletion(partitionsToBeDeleted: Set[TopicAndPartition]) {
    info("Partition deletion callback for %s".format(partitionsToBeDeleted.mkString(",")))
    val replicasPerPartition = controllerContext.replicasForPartition(partitionsToBeDeleted)
    startReplicaDeletion(replicasPerPartition)
  }

  private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
    replicasForTopicsToBeDeleted.groupBy(_.topic).foreach { case(topic, replicas) =>
      var aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic.equals(topic))
      val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
      val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
      val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
      // move dead replicas directly to failed state
      //將所有已經掛掉的replica標記成ReplicaDeletionIneligible(無法刪除的Replica)
      replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
      // send stop replica to all followers that are not in the OfflineReplica state so they stop sending fetch requests to the leader
      //將所有未掛掉的replica標記成OfflineReplica(下線的Replica),併發送給相應的broker,這樣這些broker就不會再向Leader傳送該topic的同步請求(FetchRequest)
      replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
      debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
      //給所有replica傳送停止fetch請求,請求完成後,回撥deleteTopicStopReplicaCallback方法
      controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
        new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
      if(deadReplicasForTopic.size > 0) {
        debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
        markTopicIneligibleForDeletion(Set(topic))
      }
    }
  }
  //開始刪除topic開始時,會給存活的broker傳送停止fetch的請求,請求完成後,會回撥該方法
  private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractRequestResponse, replicaId: Int) {
    val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
    debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
    val responseMap = stopReplicaResponse.responses.asScala
    val partitionsInError =
      if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
      else responseMap.filter { case (_, error) => error != Errors.NONE.code }.map(_._1).toSet
    val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
    inLock(controllerContext.controllerLock) {
      // move all the failed replicas to ReplicaDeletionIneligible
      //若有replica出現錯誤,則將它踢出可刪除的Replica列表
      failReplicaDeletion(replicasInError)
      if (replicasInError.size != responseMap.size) {
        //有些Replica已經成功刪除了資料
        // some replicas could have been successfully deleted
        val deletedReplicas = responseMap.keySet -- partitionsInError
        completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
      }
    }
  }

Kafka刪除Topic的過程

分析完原始碼,我們總結下,Kafka刪除Topic的過程
1. Kafka的broker在被選舉成controller後,會執行下面幾步
1.1 註冊DeleteTopicsListener,監聽zookeeper節點/admin/delete_topics下子節點的變化,delete命令實際上就是要在該節點下建立一個節點,名字是待刪除topic名,標記該topic是待刪除的
1.2 建立一個單獨的執行緒DeleteTopicsThread,來執行topic刪除的操作
2. DeleteTopicsThread執行緒啟動時會先在awaitTopicDeletionNotification處阻塞並等待刪除事件的通知,即有新的topic被新增到queue裡等待被刪除。
3. 當我們使用了delete命令在zookeeper上的節點/admin/delete_topics下建立子節點< topic_name >。
4. DeleteTopicsListener會收到ChildChange事件會依次判斷如下邏輯:
4.1 查詢topic是否存在,若已經不存在了,則直接刪除/admin/delete_topics/< topic_name >節點。
4.2 查詢topic是否為當前正在執行Preferred副本選舉或分割槽重分配,若果是,則標記為暫時不適合被刪除。
4.3 並將該topic新增到queue中,此時會喚醒DeleteTopicsThread中doWork方法裡awaitTopicDeletionNotification處的阻塞執行緒,讓刪除執行緒繼續往下執行。

而刪除執行緒執行刪除操作的真正邏輯是:
1. 它首先會向各broker更新原資訊,使得他們不再向外提供資料服務,準備開始刪除資料。
2. 開始刪除這個topic的所有分割槽
2.1. 給所有broker發請求,告訴它們這些分割槽要被刪除。broker收到後就不再接受任何在這些分割槽上的客戶端請求了
2.2. 把每個分割槽下的所有副本都置於OfflineReplica狀態,這樣ISR就不斷縮小,當leader副本最後也被置於OfflineReplica狀態時leader資訊將被更新為-1
2.3 將所有副本置於ReplicaDeletionStarted狀態
2.4 副本狀態機捕獲狀態變更,然後發起StopReplicaRequest給broker,broker接到請求後停止所有fetcher執行緒、移除快取,然後刪除底層log檔案
2.5 關閉所有空閒的Fetcher執行緒
3. 刪除zookeeper上節點/brokers/topics/< topic_name >
4. 刪除zookeeper上節點/config/topics/< topic_name >
5. 刪除zookeeper上節點/admin/delete_topics/< topic_name >
6. 並刪除記憶體中的topic相關資訊。

Kafka刪除Topic的流程圖

Kafka刪除Topic的流程圖

Q&A

前面我們分析了Kafka刪除Topic的原始碼,也總結了其刪除的過程,下面我們再來看看下面這些相關問題,加深對這個過程的理解

Q1:有分割槽掛掉的情況下,是否能正常刪除?

修改三個broker的server.properties,分別開啟delete.topic.enable=true
啟動zookeeper和三個broker,broker1,broker2,broker3,並啟動kafka-manager,
其中zookeeper埠為2181,
broker1埠為9091,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_1
broker2埠為9092,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_2
broker3埠為9093,log目錄為D:\Workspaces\git\others\kafka\kafkaconifg\broker_3
kafka-manager埠為9000,訪問http://localhost:9000可以檢視kafka叢集情況

開始實驗,建立topic test

.\kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 3 --partitions 6 --topic test

並寫入幾條測試訊息

.\kafka-console-producer.bat --broker-list localhost:9092 --topic test
111
222
333
444
555
666

觀察zookeeper中路徑

ls /brokers/topics/test/partitions
[0, 1, 2, 3, 4, 5]

關閉broker2,並執行刪除topic命令

.\kafka-topics.bat --delete --zookeeper localhost:2181 --topic test

觀察zookeeper中路徑/admin/delete_topics

[zk: localhost:2181(CONNECTED) 26] ls /admin/delete_topics
[test]

過幾秒後觀察只有broker2的log目錄下存在topic test的資料夾,而broker1和broker2的log目錄下已經刪除了test相關log
test-0,test-1,test-2,test-3,test-4,test-5

觀察broker1的controller.log

[2017-12-06 12:14:39,181] DEBUG [DeleteTopicsListener on 1]: Delete topics listener fired for topics test to be deleted (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,182] INFO [DeleteTopicsListener on 1]: Starting topic deletion for topics test (kafka.controller.PartitionStateMachine$DeleteTopicsListener)
[2017-12-06 12:14:39,184] INFO [delete-topics-thread-1], Handling deletion for topics test (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,186] DEBUG [Replica state machine on controller 1]: Are all replicas for topic test deleted Map([Topic=test,Partition=1,Replica=2] -> OfflineReplica, [Topic=test,Partition=5,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=2] -> OfflineReplica, [Topic=test,Partition=3,Replica=1] -> OnlineReplica, [Topic=test,Partition=2,Replica=2] -> OfflineReplica, [Topic=test,Partition=4,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=3] -> OnlineReplica, [Topic=test,Partition=2,Replica=3] -> OnlineReplica, [Topic=test,Partition=0,Replica=3] -> OnlineReplica, [Topic=test,Partition=3,Replica=3] -> OnlineReplica, [Topic=test,Partition=5,Replica=1] -> OnlineReplica, [Topic=test,Partition=0,Replica=1] -> OnlineReplica, [Topic=test,Partition=4,Replica=1] -> OnlineReplica, [Topic=test,Partition=3,Replica=2] -> OfflineReplica, [Topic=test,Partition=1,Replica=1] -> OnlineReplica, [Topic=test,Partition=5,Replica=2] -> OfflineReplica) (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,187] INFO [delete-topics-thread-1], Deletion of topic test (re)started (kafka.controller.TopicDeletionManager$DeleteTopicsThread)
[2017-12-06 12:14:39,188] INFO [Topic Deletion Manager 1], Topic deletion callback for test (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,191] INFO [Topic Deletion Manager 1], Partition deletion callback for [test,4],[test,0],[test,3],[test,2],[test,1],[test,5] (kafka.controller.TopicDeletionManager)
[2017-12-06 12:14:39,194] INFO [Replica state machine on controller 1]: Invoking state change to ReplicaDeletionIneligible for replicas [Topic=test,Partition=1,Replica=2],[Topic=test,Partition=4,Replica=2],[Topic=test,Partition=2,Replica=2],[Topic=test,Partition=0,Replica=2],[Topic=test,Partition=3,Replica=2],[Topic=test,Partition=5,Replica=2] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] INFO [Replica state machine on controller 1]: Invoking state change to OfflineReplica for replicas [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ReplicaStateMachine)
[2017-12-06 12:14:39,195] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":1,"leader_epoch":5,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,200] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,204] DEBUG [Controller 1]: Removing replica 1 from ISR 1,3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,207] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":3,"leader_epoch":5,"isr":[3]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,208] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,211] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":1,"leader_epoch":4,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,215] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,2]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] INFO [Controller 1]: New leader and ISR for partition [test,2] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,219] DEBUG [Controller 1]: Removing replica 3 from ISR 1,3 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":3,"isr":[1]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,223] DEBUG [Controller 1]: Removing replica 3 from ISR 3 for partition [test,3]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] INFO [Controller 1]: New leader and ISR for partition [test,3] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,227] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,5]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,230] INFO [Controller 1]: New leader and ISR for partition [test,5] is {"leader":-1,"leader_epoch":6,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,231] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,0]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] INFO [Controller 1]: New leader and ISR for partition [test,0] is {"leader":-1,"leader_epoch":4,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,235] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,4]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,239] INFO [Controller 1]: New leader and ISR for partition [test,4] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,240] DEBUG [Controller 1]: Removing replica 1 from ISR 1 for partition [test,1]. (kafka.controller.KafkaController)
[2017-12-06 12:14:39,243] INFO [Controller 1]: New leader and ISR for partition [test,1] is {"leader":-1,"leader_epoch":5,"isr":[]} (kafka.controller.KafkaController)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 1 is  (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 1 is [Topic=test,Partition=2,Replica=1],[Topic=test,Partition=3,Replica=1],[Topic=test,Partition=5,Replica=1],[Topic=test,Partition=0,Replica=1],[Topic=test,Partition=4,Replica=1],[Topic=test,Partition=1,Replica=1] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = true) sent to broker 3 is  (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,244] DEBUG The stop replica request (delete = false) sent to broker 3 is [Topic=test,Partition=5,Replica=3],[Topic=test,Partition=4,Replica=3],[Topic=test,Partition=1,Replica=3],[Topic=test,Partition=2,Replica=3],[Topic=test,Partition=0,Replica=3],[Topic=test,Partition=3,Replica=3] (kafka.controller.ControllerBrokerRequestBatch)
[2017-12-06 12:14:39,245] DEBUG [Topic Deletion Manager 1], Dele