1. 程式人生 > >kafka常用運維命令

kafka常用運維命令

列出所有topic
bin/kafka-topics.sh --zookeeper localhost:2181 --list
說明:其實就是去檢查zk上節點的/brokers/topics子節點,打印出來

建立topic
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic order_ledger_check_complete_test --partitions 4 --replication-factor 3
說明:線上環境我們會將自動建立topic禁用掉,改為手動建立(auto.create.topics.enable=false),parttitions和replication-factor是兩個必備選項,第一個引數是消費並行度的一個重要引數,第二個極大提高了topic的可用性.備份因子預設是1,相當於沒有備份,注意其值不能大於broker個數,否則會報錯。同時還可以指定topic級別的配置引數,這種特定的配置會覆蓋掉預設配置,並且儲存在zookeeper的/config/topics/[topic_name]節點資料裡。--alter  --config  --deleteConfig


刪除topic
bin/kafka-topics.sh --zookeeper localhost:2181 --topic payment_completed --delete
說明:在0.8.2.1之前的版本一般不建議之行刪除操作,因為有各種各樣的bug存在,目前的版本穩定些,同時我們需要將配置引數開啟(delete.topic.enable=true),刪除操作其實是通過更改一個zk節點,由另外的刪除執行緒非同步做的topicdeletionmanager。

增加partitions:
bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic topic_test --partitions 10
說明:只能增加,不能減少。如果原有分散策略是hash的方式,將會受影響。傳送端(預設10分鐘會重新整理本地儲存元資訊)和消費端都無需重啟即可生效。


增加broker:


需要將安裝包拷貝到對應伺服器上,修改broker.id,不能與現有系統中broker id衝突,然後建立好對應的日誌目錄和資料目錄等。注意的是,現有partition會繼續保持到其他broker上面,新建立topic才可能分配到該新機器上面。如果需要保持整個kafka叢集比較均衡,需要手動對現有資料進行遷移,儘量遷移非leader的partition,利用partition reassignment tools
1.--generate
也可以對候選的進行適當調整

[[email protected] kafka_2.10-0.8.2.1]$ cat topics-to-move.json

{"version":1,

 "partitions":[{"topic":"production_process_flow_test"},

               {"topic":"production_process_flow_dev"}

}

[[email protected]_2.10-0.8.2.1]$ bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2" --generate

Current partition replica assignment

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

2.--execute 

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --execute

Current partition replica assignment

{"version":1,"partitions":[{"topic":"production_process_flow_test","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":2,"replicas":[1,3]},{"topic":"production_process_flow_dev","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_dev","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_test","partition":1,"replicas":[3,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]}]}

Save this to use as the --reassignment-json-file option during rollback

Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"production_process_flow_test","partition":1,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":3,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":3,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":0,"replicas":[2,1]},{"topic":"production_process_flow_dev","partition":1,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":0,"replicas":[1,2]},{"topic":"production_process_flow_test","partition":2,"replicas":[1,2]},{"topic":"production_process_flow_dev","partition":2,"replicas":[2,1]}]}

3.--verify

bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file a.json --verify

Status of partition reassignment:

Reassignment of partition [production_process_flow_test,1] completed successfully

Reassignment of partition [production_process_flow_dev,3] completed successfully

Reassignment of partition [production_process_flow_test,3] completed successfully

Reassignment of partition [production_process_flow_dev,0] completed successfully

Reassignment of partition [production_process_flow_dev,1] completed successfully

Reassignment of partition [production_process_flow_test,0] completed successfully

Reassignment of partition [production_process_flow_test,2] completed successfully

Reassignment of partition [production_process_flow_dev,2] completed successfully

下線某臺機器分兩種情況:
1.資料完整:直接將資料目錄拷貝到新機器上,同時保持kafka安裝配置不變,啟動起來就行

2.資料不完整:這種情況下比較麻煩,需要對所有topic進行describe,如果發現有replica在這臺下線機器上,需要用partition reassignment tool進行遷移工作

增減replica:
只需要編輯 --reassignment-json-file,新增或者減少broker id即可。其他操作可以依賴partition reassignment tool 的--execute --verify.

消費訊息
watch --interval=2 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic payment_completed_dev --from-beginning

查詢消費資訊
bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group group_order_ledger --topic payment_completed_dev

寫入訊息效能測試
bin/kafka-producer-perf-test.sh --broker-list kafka1.weibo.com:9092 --batch-size 1 --message-size 1024 --messages 10000 --sync --topics topic_test