1. 程式人生 > >kafka 常用運維命令介紹(一)

kafka 常用運維命令介紹(一)

文章目錄

一、連線zk

由於kafka的各種元資料都儲存在zk,要連線kafka叢集也要通過zk獲取各個broker的ip埠然後連線broker。因此,大多數kafka自帶的運維命令都要指定zk的地址,比如用kafka-topics列出所有topics:

kafka-topics --zookeeper localhost:2181/kafka --list

--zookeeper引數是必須指定的。另外,如果kafka叢集啟動的時候在配置檔案中指定了namespace,記得要在zk的地址後面也要加上kafka所屬的namespace。否則kafka就找不到kafka叢集的相關元資料了。

由於kafka的元資料都儲存在zk,因此掌握好如何檢視zk的資料也是運維kafka叢集的一個關鍵。

zkCli 命令

zookeeper安裝包一般都會提供zkCli命令來讓使用者連線zookeeper叢集。

./zkCli.sh -timeout 5000 -r -server ip:port

之後進入zk的互動介面,就可以輸出相關命令檢視zk的資料了。

# 檢視kafka叢集下所有的brokers id列表
ls /brokers/ids
# 檢視 1003 broker的資訊
get /brokers/ids/1003

其他zk互動介面的命令這裡不多做介紹,zkCli的幫助文件已經寫的很清楚了。

二、topic 相關

kafka-topics可以進行和topics相關的一些操作。下面介紹一下如何運用該命令來操作kafka topics。

該命令最終是呼叫kafka原始碼中的TopicCommand類來實現的。

列出所有的topic & 獲取命令幫助

# 列出幫助文件,英文好的同學基本看幫助文件就可以指定大概怎麼使用該命令了
kafka-topics --help
# 列出kafka叢集下的所有topics,這裡需要指定kafka機器元資料儲存所在的zk機器地址,記得如果有namespace,要也加上,否則將連不上kafka叢集
 kafka-topics --zookeeper localhost:2181/kafka --list

建立topic

# 建立一個topic為test的topic,並指定分割槽數為5,副本數為1。這裡的副本數不能超過broker的數量,否則會報錯
kafka-topics --topic test --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 5
# 建立時指定副本在哪個broker上,多個partition之間用逗號分隔,副本之間用":"分割,第一個副本預設是leader
kafka-topics.sh --zookeeper 172.19.0.5:2181 --topic lyt2 --create --replica-assignment 1001:1002,1001:1002,1001:1002

列出所有topic的詳情

通過 --describe 引數可以列出我們指定的topics詳情,包括 partitions、leader、replicas、isr等。

kafka-topics --zookeeper localhost:2181/kafka --describe test test_yangjb
# 輸出
Topic:test	PartitionCount:5	ReplicationFactor:3	Configs:
	Topic: test	Partition: 0	Leader: 1001	Replicas: 1001,1002,1003	Isr: 1002,1001,1003
	Topic: test	Partition: 1	Leader: 1002	Replicas: 1002,1003,1001	Isr: 1002,1003,1001
	Topic: test	Partition: 2	Leader: 1003	Replicas: 1003,1001,1002	Isr: 1002,1001,1003
	Topic: test	Partition: 3	Leader: 1001	Replicas: 1001,1003,1002	Isr: 1002,1001,1003
	Topic: test	Partition: 4	Leader: 1002	Replicas: 1002,1001,1003	Isr: 1002,1001,1003

下面是一些 使用—describe時可以使用的其他引數

# 只列出修改了預設配置的那些topic。並可以檢視修改了哪些topic配置
--topics-with-overrides
# 列出那些目前沒有leader的topic
--under-replicated-partitions
# 列出那些正在同步的topic或者同步出現異常的topic
--under-replicated-partitions

刪除topic

注意,kafka刪除topic是非同步的,因此並不是命令返回了topic就已經被成功刪除。而是等待後臺的刪除任務執行成功才真正刪除該topic。

kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test

修改topic相關資訊

通過 --alter 引數可以修改topic的資訊,能修改的資訊包括 partition數量、replica分配情況、topic配置。如果要修改 partition數量時,修改的後的數量一定要比當前的數量大,否則會報錯。

# 將partition數量修改成7個
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通過 --replica-assignment 引數指定新增partition的副本分佈情況
# 如果原先的partition數量是3,那麼新增的一個分割槽的副本分佈應該在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 1001:1002,1001:1002,1001:1002,1002:1003
# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --config flush.ms=30000
# 刪除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka  --topic test --alter --delete-config flush.ms

注意,在後續的kafka版本中,關於topic的配置的修改刪除可能會被移到kafka-configs.sh中。官方建議使用kafka-configs來修改topic的配置。

三、分割槽副本重分配

在資料量大的情況下,各個broker上的資料量經常會不一致,有的broker上資料非常大,有的則很小,為了讓資料更均勻的分佈在各個broker,我們就要學會對topic的partion進行分割槽副本重分配。

首先建立一個json檔案,用來描述如何分配分割槽副本。

assign.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 1,
      "replicas": [
        1002,
        1003
      ]
    },
    {
      "topic": "test",
      "partition": 2,
      "replicas": [
        1003,
        1002
      ]
    }
  ],
  "version": 1
}

檔案中只要指定要重新分配副本的分割槽號就可以,不需要列出所有分割槽。

提交分割槽副本重分配任務:

# --execute 引數表示執行
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --execute
# --verify 引數表示檢視分割槽副本重分配任務的執行狀態
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --verify

讓系統自動幫我們生成重分配json檔案:

執行命令之前需要建立一個json檔案,告訴系統要重分配哪些分割槽:

gen.json:

{
  "topics": [
    {
      "topic": "foo"
    }
  ],
  "version": 1
}

接著執行命令

# --generate 表示生成重分配的json檔案
# --topics-to-move-json-file 指定要重分配哪些topic
# --broker-list 表示要分配到哪些broker上去
kafka-reassign-partitions --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file gen.json --broker-list 1001,1002,1003

其他引數

# 指定重分配時,在一個broker上,各個日誌目錄之間複製資料的閾值,最低要求 1 KB/s
# 如果重分配任務正在進行,第二次執行會修改原來設定的閾值
--replica-alter-log-dirs
# 指定重分配時,在不同broker之間傳輸資料的閾值,最低要求 1 KB/s
# 如果重分配任務正在進行,第二次執行會修改原來設定的閾值
--throttle
# 等待重分配任務開始的超時時間
--timeout

分割槽副本重分配過程

詳情可以看kafka原始碼的KafkaController#onPartitionReassignment()的方法註解。

RAR = Reassigned replicas,目標要分配的副本情況
OAR = Original list of replicas for partition,原先的副本分配情況
AR = current assigned replicas,當前的副本分配情況

  1. 更新zk處的partition副本配置:AR=RAR+OAR
  2. 向所有RAR+OAR的副本傳送元資料更新請求
  3. 將新增的那部分的副本狀態設定為NewReplica。也就是 RAR-OAR 那部分副本
  4. 等待所有的副本和leader保持同步。也就是抱著RAR+OAR的副本都在isr中了
  5. 將所有在RAR中的副本狀態都設定為OnlineReplica
  6. 在記憶體中先將AR=RAR
  7. 如果leader不在RAR中,就需要重新競選leader。採用ReassignedPartitionLeaderSelector選舉
  8. 將所有準備移除的副本狀態設定為OfflineReplica。也就是OAR-RAR的那部分副本。這時partition的isr會收縮
  9. 將所有準備移除的副本狀態設定為NonExistentReplica。這時所在的分割槽副本資料會被刪除。
  10. 將記憶體中的AR更新到zk
  11. 更新zk的/admin/reassign_partitions路徑,移除這個partition
  12. 傳送新的元資料到各個broker上

假設當前有OAR = {1, 2, 3}, RAR = {4,5,6},在進行partition reaassigned的過程中會發生如下變化

AR leader/isr 步驟
{1,2,3} 1/{1,2,3} 初始狀態
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步驟2
{1,2,3,4,5,6} 1/{1,2,3,4,5,6} 步驟4
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步驟7
{1,2,3,4,5,6} 4/{1,2,3,4,5,6} 步驟8
{4,5,6} 4/{4,5,6} 步驟10

四、刪除某個partition的資料

使用kafka-delete-records命令可以刪除指定topic-partition在指定offset之前的所有資料。

該命令是kafka在0.11版本之後才支援的。

首先需要編寫刪除offset描述json檔案:

delete.json

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0,
      "offset": 24
    }
  ],
  "version": 1
}

上面的json檔案表示刪除topic是test的0號parition的24之前的所有offset,也就是1-23這些offset的資料都會被刪除掉。

kafka-delete-records --bootstrap-server 127.0.0.1:9092 --offset-json-file delete.json

五、全域性&topic配置修改

通過kafka-configs命令,我們可以修改broker的配置,以及topic的配置、client和user的配置。

配置更新原理

kafka-configs命令修改配置後會被寫到對應的zookeeper的節點上持久化,之後kafka叢集重啟後還會載入這些配置,並覆蓋配置檔案的那些配置。也就是說,如果在此處設定了某個配置項,之後在配置檔案中對這個配置項的改動都不會起作用,因為被覆蓋了。

用該命令修改了配置後,可以在zk的節點下看到對應的配置內容。

節點目錄一般是 /config/entityType/entityName,entityType可以是brokers、topics、users、clients。entityName表示具體的名稱,比如broker的id,topic的名稱等。

比如要看0號broker修改過的配置項,可以在zk互動介面中輸入

# /kafka 是名稱空間
get /kafka/config/brokers/0

修改broker配置

# 將0號broker的配置 log.cleaner.backoff.ms修改成1000,flush.ms 也修改成1000
# --alter 表示要修改配置項
# --add-config 後面跟著要修改的配置項
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --add-config log.cleaner.backoff.ms=1000,flush.ms=1000 --alter
# 刪除0號broker 對 log.cleaner.backoff.ms的配置
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --delete-config log.cleaner.backoff.ms --alter
# 列出0號broker修改過的配置項
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --describe

修改topic的配置

# 將test這個topic的 delete.retention.ms修改成1000,flush.ms 也修改成1000
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --add-config delete.retention.ms=1000,flush.ms=1000 --alter
# 刪除test這個topic的 delete.retention.ms和flush.ms配置項
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --delete-config delete.retention.ms,flush.ms --alter
# 列出 test這個topic修改過的配置項
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --describe

修改client的配置

這裡的client是指客戶端,也就是produer或者consumer。客戶端支援修改的配置有

# 請求限制
request_percentage
# 推送訊息時的流量控制
producer_byte_rate
# 消費時的流量控制
consumer_byte_rate

通過指定clientId我們可以控制指定客戶端的配置,從而控制他們的流量不會超過我們設定的值

# 設定 客戶端id 為test的 producer_byte_rate和consumer_byte_rate為1024
kafka-configs --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name test

六、檢視broker上磁碟的使用情況

在0.11版本中,新增一個命令kafka-log-dirs可以檢視broker的磁碟使用情況。

該命令可以從兩個維度觀察磁碟的使用情況,一個是指定broker id,檢視該broker的資料目錄的各個topic parition的佔用大小。還可以直接指定topic,檢視這些topic的partition在各個broker上的使用情況。甚至可以兩個過濾條件一起用,同時指定brokerId和topic。

# 檢視0、1號broker上各個topic partition的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --broker-list 0,1 --describe
# 檢視topic:test 在各個broker上的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --describe
# 檢視topic test 在0號broker上的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --broker-list 0 --describe

輸出示例:

{
  "version": 1,
  "brokers": [
    {
      "broker": 1001,
      "logDirs": [
        {
          "logDir": "/kafka/kafka-logs-7da01186c90a",
          "error": null,
          "partitions": [
            {
              "partition": "test-4",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            },
            {
              "partition": "test-0",
              "size": 0,
              "offsetLag": 0,
              "isFuture": false
            }
          ]
        }
      ]
    }
  ]
}

七、使用kafka-preferred-replica-election進行leader選舉

當我們檢視某個topic partition時,會輸出該partiton replica的列表,其中replica列表的第一個replica被kafka稱為preferred replica。

Topic: test	Partition: 0	Leader: 1002	Replicas: 1001,1002,1003	Isr: 1002,1001,1003

上面的test partition-0中,1001就是那個preferred replica。在大多情況下,preferred replica一般就是leader,但是有些情況可能不是。因此,kafka提供了kafka-preferred-replica-election來將preferred replica選舉成leader。

首先我們需要編輯prefered.json 檔案:

{
  "partitions": [
    {
      "topic": "test",
      "partition": 0
    }
  ]
}

該檔案告訴工具我們要對topic test的partition-0進行preferred選舉:

kafka-preferred-replica-election --zookeeper 127.0.0.1:2181 --path-to-json-file prefered.json