1. 程式人生 > >kafka實戰 - 刪除topic

kafka實戰 - 刪除topic

默認值 的區別 正常 方法 物理文件 bin 版本 感覺 ger

概述

  在平時對kafka的運維工作中,我們經常會由於某些原因去刪除一個topic,比如這個topic是測試用的,生產環境中需要刪除。或者我想擴容topic的同時,這個topic中的數據我不想要了,這時候刪除topic,增加broker,再重新創建topic就會是比較簡單的方法。但是kafka刪除topic時,有很多關鍵的點必須清楚,否則在刪除topic的時候就會出現各種各樣的問題。

  我測試環境使用的kafka版本是0.10.2.0,不同版本的kafka默認配置和bin目錄下腳本使用的方式略有不同,以下討論僅在0.10.2.0版本的kafka中實測過。使用的producer和consumer均為php客戶端。

推薦的自動化的刪除方法

  在kafka0.8.2.x之後的kafka都支持自動化刪除topic,並且官方提供了把這個功能做到了bin/kafka-topics.sh中。只需要下面簡單的一步,和這個topic相關的數據就會被刪除:

    bin/kafka-topics.sh --zookeeper zk_host:port/chroot --delete --topic my_topic_name

  如果使用這種刪除方法,需要註意以下幾個問題:

    1. config文件中的delete.topic.enable需要設置為true

      在0.10.2.0版本中,這個參數默認是為false的。經過實測,如果這個參數不顯式地指定為true,上面的命令和沒執行一樣,producer該生產生產,consumer該消費消費,consumer_group的logsize也正常顯式。broker的刪除原理就是用戶在zookeeper/admin/delete_topics中創建一個節點(以topic命名),controller(其實就是某個選舉出來的broker),監聽zookeeper的/admin/delete_topics目錄,如果發現有新的節點創建,則會啟動刪除topic的邏輯(可以參考這篇博文:https://www.cnblogs.com/huxi2b/p/4842695.html)。如果你ls一下這個目錄,發現如果delete.topic.enable為false,這個臨時節點也是不會創建的,也就是說,刪除topic的邏輯壓根不會啟動。

    2. 停止producer和consumer

      不停producer和consumer不一定會發生問題,但是停掉了一定不會發生問題。要是不是什麽實時的業務,還是停掉比較好一些。在接下來的討論裏面,你就會發現,在刪除topic的同時,不停止producer和consumer會產生多麽復雜的情況,而且搞不好還會漏消費數據,造成數據丟失的情況。
      在開始討論之前,不得不提到一個config文件中的參數,auto.create.topics.enable。該參數在官方文檔中的描述是這樣的:Enable auto creation of topic on the server。在實踐中的效果是這樣的:如果你給一個不存在的topic中produce數據,或者你給一個不存
在 的topic發起consume請求,那麽這個topic就會自動被創建。與這個配置相關的配置還有2個,num.partitions和default.replication.factor,分別控制自動創建的topic的partition數和副本數。在0.10.2.0版本kafka中,默認值是auto.create.topics.enable=true, num.partitions=1, default.replication.factor=1。

     第一種情況:auto.create.topics.enable關閉

      根據實測,當producer正在生產,或者consumer正在消費的時候,執行delete topic的命令行,producer會被卡住,consumer也會停止消費,topic被刪除,logsize變為0。在不重啟producer進程和consumer進程的情況下,如果手動重新創建topic,這時producer開始成功生產數據,consumer也開始消費數據,消費到的是topic重建後producer產生的數據,lag也顯示正常,就和創建好topic,再開啟producer和consumer是一樣的。可以看到,如果auto.crete.topics.enable為false的情況下,即便是不關閉producer和consumer,直接調用delete topic命令行,也不會發生出現錯誤。

    第二種情況,auto.create.topics.enable開啟

      在auto.create.topics.enable為true的場景下, 需要討論的情況就有很多了:  

      (1)如果是只有1個partition,只有1個consumer消費,且num.partitions=1

       假設producer生產數據比consumer消費數快,而且已經運行了一段時間了,也就是說已經積壓了一些數據了。
       此時,不關閉producer和consumer,直接調用delete topic的命令行,你會發現:原來的topic確實已經被刪除了,但是因為producer和consumer正在運行,所以broker有重新創建了一個partition為1的topic,而且logsize為0。但是很快,因為producer並不會因為topic被重新創建了而停止,所以logsize會繼續從0開始增長,增長的數量就是topic被重建後,producer生產成功的消息條數,producer的行為很好理解。但是consumer就會出現一些令人費解的行為,首先是consumer會繼續消費topic被重建之前,producer生產的數據,直到把這些數據消費完畢。但是理論上delete topic命令行一執行,log文件被刪除了(logsize也顯示為0,也不是poll緩存下的,我已經把receive.message.max.bytes設置的非常小了),為什麽還能消費得到。這個問題暫時沒有找到答案,可能是因為page cache的原因。但是根據測試確實會發生這樣的情況,這種情況就會造成consumer消費了本來應該是被刪除的數據。而且在此期間,lag有一段時間會為負值,是因為logsize變為了從0開始增加,但是consumer給broker提交的offset仍然是topic重建之前的值,所以lag=logsize-consumer_offset,也就變成了負值。第二個異常行為是,consumer把topic重建前producer生產的數據消費完之後,不能繼續消費topic重建之後producer生產的數據,會顯示RD_KAFKA_RESP_ERR_PARTITION_EOF錯誤。除非重啟,才有可能正確消費數據。為什麽說可能呢?因為在auto.create.topics.enable=true且producer和consumer正在運行的情況下,topic是被刪除了,但是consumer_group並沒有被刪除。也就是說,consumer重啟之後,會從上次被殺掉時候的offset開始消費新的日誌。分為以下2種情況:

       <1> topic重建後,producer新生產的數據的個數小於consumer被殺掉最後提交的offset

         技術分享圖片

         Topic重建後,producer新生產的數據的個數小於consumer offset,所以lag=LogSize-ConsumerOffset為負。如果此時重啟consumer,則consumer下次給broker發送的fetch請求是消費offset=26的數據,大於LogSize。根據實測,會從offset=0開始消費,也就是正常從頭開始消費,不會漏掉數據,lag也會變為從12開始遞減。

        <2> topic重建後,producer新生產的數據的個數大於consumer被殺掉最後提交的offset

          技術分享圖片

          Topic重建後,由於producer生產數據快於consumer的消費速度,此時新生產的數據個數已經大於ConsumerOffset。如果此時重啟consumer,則consumer下次給broker發送的fetch請求是消費offset為40的數據,那麽broker理所當然地把新生產的數據中的offset為40的數據發送給consumer,consumer也從此在這個offset開始消費。同時新生產的數據中0~39的數據就永遠都不會被消費了,造成了丟失數據的後果。

      (2)如果是只有1個partition,但是有多個consumer消費,且num.partitions=1   

         對,你沒有看錯。就是多個consumer消費一個partition。這種情況和上一種情況唯一的區別就是consumer的個數。但是topic重建後,發現logsize並不會成為0,producer繼續生產數據,logsize增加相應的個數。不重啟consumer,consumer會把topic重建之前生產的數據全部消費完,這造成消費了本來應該刪除數據的錯誤,然後接著消費producer新生產的數據,並不會卡住。producer和consumer還有整個topic的各項參數就像什麽都沒有發生一樣。

      上面舉的兩個例子是最簡單的兩種情況,但是已經可以感覺得到如果在刪除topic時auto.create.topic.enable=true並且不關閉producer和consumer產生的結果有多復雜了。筆者還做過更多的測試,比如原始partition個數並不為1,但是num.partition為1(自動創建的topic的partition個數為1)。或者原始partition個數不為1,num.partition也不為1, 但是數值不一樣。再加上consumer個數是否為1個這兩種情況,結果分析起來會更加復雜。 所以,如果為了代碼寫起來方便,執意設置auto.create.topic.enable=true, 那麽在刪除topic時,還是關閉producer和consumer為妙。
      使用這個命令行還需要註意的一點是,如果某臺broker掛掉了,也會影響到刪除topic。具體的表現就是producer和consumer都被卡住了。topic顯示刪除不掉。但是如果把broker再拉起來,topic就會被成功刪除了。
但是獲過頭來想,delete.topic.enable默認為false其實也是符合生產環境安全需求的。假設有多個部門在共享一個kafka集群。如果某天另外一個部門的同事delete topic的時候把topic粘貼錯了,或者他根本不知道有這個topic。這樣topic會被直接刪除,這在線上環境是很嚴重的事情。

  

手動的刪除方法

  1. 停止producer和consumer

   2. 停止kafka(不是停止zookeeper,因為第4步要用到zookeeper)

   3. 刪除config文件中log.dir下的topic相關文件

   4. 在zookeeper上刪除 /config/topics/topic_name, /brokers/topics/topic_name, /admin/topic_name

  重啟kafka之後,發現topic被刪除。這時可以自行創建新的topic。

  關於是否一定要停止kafka才能手動刪除topic,筆者做了一些測試。關閉了producer,關閉了consumer。然後做了第3步和第4步。然後重啟producer和consumer。發現producer可以繼續produce成功,但是不會生成物理文件,也不會在zookepe的/brokers/topics/和/config/topics/, /admin/delete_topics中生成任何數據。。開啟多個consumer可以繼續消費(消費到的是刪除topic之前producer生產的數據,消費的可能是broker的page cache中的東西),但是去log_dir下看,沒有物理文件。zookeper的/brokers/topics/和/config/topics/, /admin/delete_topics中沒有任何數據。這造成了consumer消費了本該刪除的數據,producer丟失了生產的數據的後果。所以手動刪除topic還是停止kafka,producer,consumer比較好。

  使用這個方法的時候有一點要特別註意。這種方式會造成上面講到的topic雖然刪除了,但是consumer_group依然存在的問題。如果topic重建之後,producer先運行,且新生產的數據個數大於consumer被殺掉時的ConsumerOffset,那麽就會造成開頭一部分數據無法消費到。如果新生產的數據少於consumer被殺掉時的ConsumerOffset,那麽從offset=0開始消費。刪除ConsumerOffset,在0.10.2.0版本中沒有提供,因為這些東西都是保存在__consumer_offset topic中的。沒有很方便的腳本把某個consumer_group的位移信息從__consumer_offset中刪除。 而且這個topic保存了所有consumer_group的位移信息,亂動這個topic需要承擔其他consumer_group位移丟失的風險。如果某個consumer_group在一定時間內,如果再也沒有consumer加入,那麽這個consumer_group的位移信息將會被自動刪除。這個時間由config中的offsets.retention.minutes參數控制,默認是1天。解決剛才說的consumer_group在topic刪除後仍然存留的問題可以通過重置offset的方式實現。在kafka reset offset 0.11 版提供了命令行的方法。筆者已經使用0.11版的命令行去操作0.10版的kafka, 測試顯示一切正常。

  使用方式可以參考這個文檔:https://www.cnblogs.com/huxi2b/p/7284767.html

參考資料

  刪除topic的邏輯:

    https://www.cnblogs.com/huxi2b/p/4842695.html

  幾個topic相關的kafka配置參數:

    https://www.learningjournal.guru/courses/kafka/kafka-foundation-training/broker-configurations/

  一些刪除topic相關討論:

    https://stackoverflow.com/questions/33537950/how-to-delete-a-topic-in-apache-kafka

    https://stackoverflow.com/questions/25568178/kafka-server-stop-sh-not-working-when-kafka-started-from-python-script

    http://wanwenli.com/kafka/2016/11/04/Kafka-Group-Coordinator.html
    https://github.com/yahoo/kafka-manager/issues/341
    https://stackoverflow.com/questions/29243896/removing-a-kafka-consumer-group-in-zookeeper 

    https://stackoverflow.com/questions/39529511/what-is-the-use-of-consumer-offsets-and-schema-topics-in-kafka

  

       

kafka實戰 - 刪除topic