1. 程式人生 > >Kafka集群擴展以及重新分布分區

Kafka集群擴展以及重新分布分區

add aci sig 分布 我們 mbo onf completed cati

我們往已經部署好的Kafka集群裏面添加機器是最正常不過的需求,而且添加起來非常地方便,我們需要做的事是從已經部署好的Kafka節點中復制相應的配置文件,然後把裏面的broker id修改成全局唯一的,最後啟動這個節點即可將它加入到現有Kafka集群中。

  但是問題來了,新添加的Kafka節點並不會自動地分配數據,所以無法分擔集群的負載,除非我們新建一個topic。但是現在我們想手動將部分分區移到新添加的Kafka節點上,Kafka內部提供了相關的工具來重新分布某個topic的分區。在重新分布topic分區之前,我們先來看看現在topic的各個分區的分布位置:

.
/bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181 Topic:iteblog PartitionCount:7 ReplicationFactor:2 Configs: Topic: iteblog Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2 Topic: iteblog Partition: 1 Leader: 2 Replicas: 2,3 Isr: 2,3 Topic: iteblog Partition: 2 Leader: 3 Replicas: 3,4 Isr: 3,4
Topic: iteblog Partition: 3 Leader: 4 Replicas: 4,1 Isr: 4,1 Topic: iteblog Partition: 4 Leader: 1 Replicas: 1,3 Isr: 1,3 Topic: iteblog Partition: 5 Leader: 2 Replicas: 2,4 Isr: 2,4 Topic: iteblog Partition: 6 Leader: 3 Replicas: 3,1 Isr: 3,1

從上面的輸出可以看出,iteblog主題一共有7個分區,但是我們broker的個數只有4個,所以會導致某些broker維護更多的分區。現在我們在現有集群的基礎上再添加一個Kafka節點,然後使用Kafka自帶的kafka-reassign-partitions.sh

工具來重新分布分區。該工具有三種使用模式:

  1、generate模式,給定需要重新分配的Topic,自動生成reassign plan(並不執行)
  2、execute模式,根據指定的reassign plan重新分配Partition
  3、verify模式,驗證重新分配Partition是否成功

現在我們需要將原先分布在broker 1-4節點上的分區重新分布到broker 1-5節點上,借助kafka-reassign-partitions.sh工具生成reassign plan,不過我們先得按照要求定義一個文件,裏面說明哪些topic需要重新分區,文件內容如下:

[[email protected] ~]$ cat topics-to-move.json {"topics": [{"topic": "iteblog"}], "version":1 }

然後使用kafka-reassign-partitions.sh工具生成reassign plan


[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5" --generate
Current partition replica assignment

{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]}
Proposed partition reassignment configuration

{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[3,5]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":2,"replicas":[2,4]}]}

Proposed partition reassignment configuration下面生成的就是將分區重新分布到broker 1-5上的結果。我們將這些內容保存到名為result.json文件裏面(文件名不重要,文件格式也不一定要以json為結尾,只要保證內容是json即可),然後執行這些reassign plan:


[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"iteblog","partition":3,"replicas":[4,1]},{"topic":"iteblog","partition":5,"replicas":[2,4]},{"topic":"iteblog","partition":4,"replicas":[1,3]},{"topic":"iteblog","partition":0,"replicas":[1,2]},{"topic":"iteblog","partition":6,"replicas":[3,1]},{"topic":"iteblog","partition":1,"replicas":[2,3]},{"topic":"iteblog","partition":2,"replicas":[3,4]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started reassignment of partitions {"version":1,"partitions":[{"topic":"iteblog","partition":1,"replicas":[1,3]},{"topic":"iteblog","partition":5,"replicas":[5,3]},{"topic":"iteblog","partition":4,"replicas":[4,1]},{"topic":"iteblog","partition":6,"replicas":[1,4]},{"topic":"iteblog","partition":2,"replicas":[2,4]},{"topic":"iteblog","partition":0,"replicas":[5,2]},{"topic":"iteblog","partition":3,"replicas":[3,5]}]}

這樣Kafka就在執行reassign plan,我們可以校驗reassign plan是否執行完成:


[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verify
Status of partition reassignment:
Reassignment of partition [iteblog,1] completed successfully
Reassignment of partition [iteblog,5] is still in progress
Reassignment of partition [iteblog,4] completed successfully
Reassignment of partition [iteblog,6] completed successfully
Reassignment of partition [iteblog,2] completed successfully
Reassignment of partition [iteblog,0] is still in progress
Reassignment of partition [iteblog,3] completed successfully

[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --verify
Status of partition reassignment:
Reassignment of partition [iteblog,1] completed successfully
Reassignment of partition [iteblog,5] completed successfully
Reassignment of partition [iteblog,4] completed successfully
Reassignment of partition [iteblog,6] completed successfully
Reassignment of partition [iteblog,2] completed successfully
Reassignment of partition [iteblog,0] completed successfully
Reassignment of partition [iteblog,3] completed successfully

可以看出,分區正在Reassignment的狀態是still in progress;如果分區Reassignment完成則completed successfully,然後我們就可以看到分區已經按照生成的reassign plan進行,我們可以看下topic各個分區現在的分布情況:


[iteblog@www.iteblog.com ~]$ ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181
Topic:iteblog PartitionCount:7  ReplicationFactor:2 Configs:
  Topic: iteblog  Partition: 0  Leader: 5 Replicas: 5,2 Isr: 2,5
  Topic: iteblog  Partition: 1  Leader: 1 Replicas: 1,3 Isr: 3,1
  Topic: iteblog  Partition: 2  Leader: 2 Replicas: 2,4 Isr: 4,2
  Topic: iteblog  Partition: 3  Leader: 3 Replicas: 3,5 Isr: 3,5
  Topic: iteblog  Partition: 4  Leader: 1 Replicas: 4,1 Isr: 1,4
  Topic: iteblog  Partition: 5  Leader: 5 Replicas: 5,3 Isr: 3,5
  Topic: iteblog  Partition: 6  Leader: 1 Replicas: 1,4 Isr: 1,4

分區的分布的確和操作之前不一樣了,broker 5上已經有分區分布上去了。但是仔細的同學應該可以發現,broker 4上居然沒有分區的Leader,這肯定不是我們想要的!所以使用kafka-reassign-partitions.sh工具生成的reassign plan只是一個建議,方便大家而已。其實我們自己完全可以編輯一個reassign plan,然後執行它,如下:

{ "version": 1, "partitions": [ { "topic": "iteblog", "partition": 0, "replicas": [ 1, 2 ] }, { "topic": "iteblog", "partition": 1, "replicas": [ 2, 3 ] }, { "topic": "iteblog", "partition": 2, "replicas": [ 3, 4 ] }, { "topic": "iteblog", "partition": 3, "replicas": [ 4, 5 ] }, { "topic": "iteblog", "partition": 4, "replicas": [ 5, 1 ] }, { "topic": "iteblog", "partition": 5, "replicas": [ 1, 3 ] }, { "topic": "iteblog", "partition": 6, "replicas": [ 2, 4 ] } ] }

將上面的json數據文件保存到result.json文件中,然後也是執行它:


[iteblog@www.iteblog.com ~]$ bin/kafka-reassign-partitions.sh --zookeeper www.iteblog.com:2181 --reassignment-json-file result.json --execute

等這個reassign plan執行完,我們再來看看分區的分布:


[iteblog@www.iteblog.com ~]$  ./bin/kafka-topics.sh --topic iteblog --describe --zookeeper www.iteblog.com:2181
Topic:iteblog PartitionCount:7  ReplicationFactor:2 Configs:
  Topic: iteblog  Partition: 0  Leader: 1 Replicas: 1,2 Isr: 2,1
  Topic: iteblog  Partition: 1  Leader: 2 Replicas: 2,3 Isr: 3,2
  Topic: iteblog  Partition: 2  Leader: 3 Replicas: 3,4 Isr: 4,3
  Topic: iteblog  Partition: 3  Leader: 4 Replicas: 4,5 Isr: 5,4
  Topic: iteblog  Partition: 4  Leader: 5 Replicas: 5,1 Isr: 1,5
  Topic: iteblog  Partition: 5  Leader: 1 Replicas: 1,3 Isr: 3,1
  Topic: iteblog  Partition: 6  Leader: 2 Replicas: 2,4 Isr: 4,2

果然已經按照我們需求分布了。。

Kafka集群擴展以及重新分布分區