kafka 常用運維命令介紹(一)
文章目錄
一、連線zk
由於kafka的各種元資料都儲存在zk,要連線kafka叢集也要通過zk獲取各個broker的ip埠然後連線broker。因此,大多數kafka自帶的運維命令都要指定zk的地址,比如用kafka-topics列出所有topics:
kafka-topics --zookeeper localhost:2181/kafka --list
--zookeeper
引數是必須指定的。另外,如果kafka叢集啟動的時候在配置檔案中指定了namespace,記得要在zk的地址後面也要加上kafka所屬的namespace。否則kafka就找不到kafka叢集的相關元資料了。
由於kafka的元資料都儲存在zk,因此掌握好如何檢視zk的資料也是運維kafka叢集的一個關鍵。
zkCli 命令
zookeeper安裝包一般都會提供zkCli命令來讓使用者連線zookeeper叢集。
./zkCli.sh -timeout 5000 -r -server ip:port
之後進入zk的互動介面,就可以輸出相關命令檢視zk的資料了。
# 檢視kafka叢集下所有的brokers id列表
ls /brokers/ids
# 檢視 1003 broker的資訊
get /brokers/ids/1003
其他zk互動介面的命令這裡不多做介紹,zkCli的幫助文件已經寫的很清楚了。
二、topic 相關
kafka-topics可以進行和topics相關的一些操作。下面介紹一下如何運用該命令來操作kafka topics。
該命令最終是呼叫kafka原始碼中的TopicCommand類來實現的。
列出所有的topic & 獲取命令幫助
# 列出幫助文件,英文好的同學基本看幫助文件就可以指定大概怎麼使用該命令了
kafka-topics --help
# 列出kafka叢集下的所有topics,這裡需要指定kafka機器元資料儲存所在的zk機器地址,記得如果有namespace,要也加上,否則將連不上kafka叢集
kafka-topics --zookeeper localhost:2181/kafka --list
建立topic
# 建立一個topic為test的topic,並指定分割槽數為5,副本數為1。這裡的副本數不能超過broker的數量,否則會報錯
kafka-topics --topic test --zookeeper localhost:2181/kafka --create --replication-factor 1 --partitions 5
# 建立時指定副本在哪個broker上,多個partition之間用逗號分隔,副本之間用":"分割,第一個副本預設是leader
kafka-topics.sh --zookeeper 172.19.0.5:2181 --topic lyt2 --create --replica-assignment 1001:1002,1001:1002,1001:1002
列出所有topic的詳情
通過 --describe
引數可以列出我們指定的topics詳情,包括 partitions、leader、replicas、isr等。
kafka-topics --zookeeper localhost:2181/kafka --describe test test_yangjb
# 輸出
Topic:test PartitionCount:5 ReplicationFactor:3 Configs:
Topic: test Partition: 0 Leader: 1001 Replicas: 1001,1002,1003 Isr: 1002,1001,1003
Topic: test Partition: 1 Leader: 1002 Replicas: 1002,1003,1001 Isr: 1002,1003,1001
Topic: test Partition: 2 Leader: 1003 Replicas: 1003,1001,1002 Isr: 1002,1001,1003
Topic: test Partition: 3 Leader: 1001 Replicas: 1001,1003,1002 Isr: 1002,1001,1003
Topic: test Partition: 4 Leader: 1002 Replicas: 1002,1001,1003 Isr: 1002,1001,1003
下面是一些 使用—describe
時可以使用的其他引數
# 只列出修改了預設配置的那些topic。並可以檢視修改了哪些topic配置
--topics-with-overrides
# 列出那些目前沒有leader的topic
--under-replicated-partitions
# 列出那些正在同步的topic或者同步出現異常的topic
--under-replicated-partitions
刪除topic
注意,kafka刪除topic是非同步的,因此並不是命令返回了topic就已經被成功刪除。而是等待後臺的刪除任務執行成功才真正刪除該topic。
kafka-topics --zookeeper localhost:2181/kafka --delete --topic yangjb_test
修改topic相關資訊
通過 --alter
引數可以修改topic的資訊,能修改的資訊包括 partition數量、replica分配情況、topic配置。如果要修改 partition數量時,修改的後的數量一定要比當前的數量大,否則會報錯。
# 將partition數量修改成7個
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --partitions 7
# 通過 --replica-assignment 引數指定新增partition的副本分佈情況
# 如果原先的partition數量是3,那麼新增的一個分割槽的副本分佈應該在1002和1003
kafka-topics --zookeeper localhost:2181/kafka --topic test -alter --partitions 4 --replica-assignment 1001:1002,1001:1002,1001:1002,1002:1003
# 修改topic test的配置 flush.ms =30000 。
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --config flush.ms=30000
# 刪除topic test的 flush.ms 配置
kafka-topics --zookeeper localhost:2181/kafka --topic test --alter --delete-config flush.ms
注意,在後續的kafka版本中,關於topic的配置的修改刪除可能會被移到kafka-configs.sh中。官方建議使用kafka-configs來修改topic的配置。
三、分割槽副本重分配
在資料量大的情況下,各個broker上的資料量經常會不一致,有的broker上資料非常大,有的則很小,為了讓資料更均勻的分佈在各個broker,我們就要學會對topic的partion進行分割槽副本重分配。
首先建立一個json檔案,用來描述如何分配分割槽副本。
assign.json
:
{
"partitions": [
{
"topic": "test",
"partition": 1,
"replicas": [
1002,
1003
]
},
{
"topic": "test",
"partition": 2,
"replicas": [
1003,
1002
]
}
],
"version": 1
}
檔案中只要指定要重新分配副本的分割槽號就可以,不需要列出所有分割槽。
提交分割槽副本重分配任務:
# --execute 引數表示執行
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --execute
# --verify 引數表示檢視分割槽副本重分配任務的執行狀態
kafka-reassign-partitions --zookeeper localhost:2181/kafka --reassignment-json-file assign.json --verify
讓系統自動幫我們生成重分配json檔案:
執行命令之前需要建立一個json檔案,告訴系統要重分配哪些分割槽:
gen.json
:
{
"topics": [
{
"topic": "foo"
}
],
"version": 1
}
接著執行命令
# --generate 表示生成重分配的json檔案
# --topics-to-move-json-file 指定要重分配哪些topic
# --broker-list 表示要分配到哪些broker上去
kafka-reassign-partitions --zookeeper localhost:2181/kafka --generate --topics-to-move-json-file gen.json --broker-list 1001,1002,1003
其他引數
# 指定重分配時,在一個broker上,各個日誌目錄之間複製資料的閾值,最低要求 1 KB/s
# 如果重分配任務正在進行,第二次執行會修改原來設定的閾值
--replica-alter-log-dirs
# 指定重分配時,在不同broker之間傳輸資料的閾值,最低要求 1 KB/s
# 如果重分配任務正在進行,第二次執行會修改原來設定的閾值
--throttle
# 等待重分配任務開始的超時時間
--timeout
分割槽副本重分配過程
詳情可以看kafka原始碼的KafkaController#onPartitionReassignment()
的方法註解。
RAR = Reassigned replicas,目標要分配的副本情況
OAR = Original list of replicas for partition,原先的副本分配情況
AR = current assigned replicas,當前的副本分配情況
- 更新zk處的partition副本配置:AR=RAR+OAR
- 向所有RAR+OAR的副本傳送元資料更新請求
- 將新增的那部分的副本狀態設定為NewReplica。也就是 RAR-OAR 那部分副本
- 等待所有的副本和leader保持同步。也就是抱著RAR+OAR的副本都在isr中了
- 將所有在RAR中的副本狀態都設定為OnlineReplica
- 在記憶體中先將AR=RAR
- 如果leader不在RAR中,就需要重新競選leader。採用ReassignedPartitionLeaderSelector選舉
- 將所有準備移除的副本狀態設定為OfflineReplica。也就是OAR-RAR的那部分副本。這時partition的isr會收縮
- 將所有準備移除的副本狀態設定為NonExistentReplica。這時所在的分割槽副本資料會被刪除。
- 將記憶體中的AR更新到zk
- 更新zk的/admin/reassign_partitions路徑,移除這個partition
- 傳送新的元資料到各個broker上
假設當前有OAR = {1, 2, 3}, RAR = {4,5,6},在進行partition reaassigned的過程中會發生如下變化
AR | leader/isr | 步驟 |
---|---|---|
{1,2,3} | 1/{1,2,3} | 初始狀態 |
{1,2,3,4,5,6} | 1/{1,2,3,4,5,6} | 步驟2 |
{1,2,3,4,5,6} | 1/{1,2,3,4,5,6} | 步驟4 |
{1,2,3,4,5,6} | 4/{1,2,3,4,5,6} | 步驟7 |
{1,2,3,4,5,6} | 4/{1,2,3,4,5,6} | 步驟8 |
{4,5,6} | 4/{4,5,6} | 步驟10 |
四、刪除某個partition的資料
使用kafka-delete-records命令可以刪除指定topic-partition在指定offset之前的所有資料。
該命令是kafka在0.11版本之後才支援的。
首先需要編寫刪除offset描述json檔案:
delete.json
{
"partitions": [
{
"topic": "test",
"partition": 0,
"offset": 24
}
],
"version": 1
}
上面的json檔案表示刪除topic是test的0號parition的24之前的所有offset,也就是1-23這些offset的資料都會被刪除掉。
kafka-delete-records --bootstrap-server 127.0.0.1:9092 --offset-json-file delete.json
五、全域性&topic配置修改
通過kafka-configs
命令,我們可以修改broker的配置,以及topic的配置、client和user的配置。
配置更新原理
kafka-configs命令修改配置後會被寫到對應的zookeeper的節點上持久化,之後kafka叢集重啟後還會載入這些配置,並覆蓋配置檔案的那些配置。也就是說,如果在此處設定了某個配置項,之後在配置檔案中對這個配置項的改動都不會起作用,因為被覆蓋了。
用該命令修改了配置後,可以在zk的節點下看到對應的配置內容。
節點目錄一般是 /config/entityType/entityName
,entityType可以是brokers、topics、users、clients。entityName表示具體的名稱,比如broker的id,topic的名稱等。
比如要看0號broker修改過的配置項,可以在zk互動介面中輸入
# /kafka 是名稱空間
get /kafka/config/brokers/0
修改broker配置
# 將0號broker的配置 log.cleaner.backoff.ms修改成1000,flush.ms 也修改成1000
# --alter 表示要修改配置項
# --add-config 後面跟著要修改的配置項
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --add-config log.cleaner.backoff.ms=1000,flush.ms=1000 --alter
# 刪除0號broker 對 log.cleaner.backoff.ms的配置
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --delete-config log.cleaner.backoff.ms --alter
# 列出0號broker修改過的配置項
kafka-configs --bootstrap-server 127.0.0.1:9092 --entity-type brokers --entity-name 0 --describe
修改topic的配置
# 將test這個topic的 delete.retention.ms修改成1000,flush.ms 也修改成1000
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --add-config delete.retention.ms=1000,flush.ms=1000 --alter
# 刪除test這個topic的 delete.retention.ms和flush.ms配置項
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --delete-config delete.retention.ms,flush.ms --alter
# 列出 test這個topic修改過的配置項
kafka-configs --zookeeper 127.0.0.1:2181/kafka --entity-type topics --entity-name test --describe
修改client的配置
這裡的client是指客戶端,也就是produer或者consumer。客戶端支援修改的配置有
# 請求限制
request_percentage
# 推送訊息時的流量控制
producer_byte_rate
# 消費時的流量控制
consumer_byte_rate
通過指定clientId我們可以控制指定客戶端的配置,從而控制他們的流量不會超過我們設定的值
# 設定 客戶端id 為test的 producer_byte_rate和consumer_byte_rate為1024
kafka-configs --zookeeper 127.0.0.1:2181/kafka --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=1024' --entity-type clients --entity-name test
六、檢視broker上磁碟的使用情況
在0.11版本中,新增一個命令kafka-log-dirs
可以檢視broker的磁碟使用情況。
該命令可以從兩個維度觀察磁碟的使用情況,一個是指定broker id,檢視該broker的資料目錄的各個topic parition的佔用大小。還可以直接指定topic,檢視這些topic的partition在各個broker上的使用情況。甚至可以兩個過濾條件一起用,同時指定brokerId和topic。
# 檢視0、1號broker上各個topic partition的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --broker-list 0,1 --describe
# 檢視topic:test 在各個broker上的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --describe
# 檢視topic test 在0號broker上的磁碟使用情況
kafka-log-dirs --bootstrap-server 127.0.0.1:9092 --topic-list test --broker-list 0 --describe
輸出示例:
{
"version": 1,
"brokers": [
{
"broker": 1001,
"logDirs": [
{
"logDir": "/kafka/kafka-logs-7da01186c90a",
"error": null,
"partitions": [
{
"partition": "test-4",
"size": 0,
"offsetLag": 0,
"isFuture": false
},
{
"partition": "test-0",
"size": 0,
"offsetLag": 0,
"isFuture": false
}
]
}
]
}
]
}
七、使用kafka-preferred-replica-election進行leader選舉
當我們檢視某個topic partition時,會輸出該partiton replica的列表,其中replica列表的第一個replica被kafka稱為preferred replica。
Topic: test Partition: 0 Leader: 1002 Replicas: 1001,1002,1003 Isr: 1002,1001,1003
上面的test partition-0中,1001就是那個preferred replica。在大多情況下,preferred replica一般就是leader,但是有些情況可能不是。因此,kafka提供了kafka-preferred-replica-election來將preferred replica選舉成leader。
首先我們需要編輯prefered.json 檔案:
{
"partitions": [
{
"topic": "test",
"partition": 0
}
]
}
該檔案告訴工具我們要對topic test的partition-0進行preferred選舉:
kafka-preferred-replica-election --zookeeper 127.0.0.1:2181 --path-to-json-file prefered.json