1. 程式人生 > >使用java操作zookeeper實現kafka-topics.sh的功能

使用java操作zookeeper實現kafka-topics.sh的功能

使用java操作zookeeper實現kafka-topics.sh的功能

需求

因為工作要求,需要對測試環境的kafka的所有topic增加分割槽。因為topic很多,所以手動使用命令列實現不太現實(寫這篇文章的時候忽然想起來也可以實現。只需要寫個指令碼,加一個迴圈就可以了,好像還簡單一點-_-||)。所以尋求通過程式碼連線客戶端的方式實現。

程式碼如下

import kafka.admin.AdminUtils;
import kafka.admin.RackAwareMode;
import
kafka.utils.ZkUtils; import org.apache.kafka.common.security.JaasUtils; import org.junit.Test; import scala.collection.JavaConversions; import scala.collection.Map; import scala.collection.Seq; import java.util.List; import java.util.Properties; /** * Created by NightWatch on 2018/12/11. */ public class
TopicSh { private static String hostAndPort = "10.1.24.216:2181"; @Test //建立topic public void createTopic() { ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled()); AdminUtils.createTopic(zkUtils, "topic-name", 1, 1, new Properties(), RackAwareMode.
Enforced$.MODULE$); } @Test //獲取所有topic public void listTopicOne() { ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled()); Map<String, Properties> map = AdminUtils.fetchAllTopicConfigs(zkUtils); java.util.Map<String, Properties> javaMap = JavaConversions.asJavaMap(map); for (java.util.Map.Entry<String, Properties> entry : javaMap.entrySet()) { //topic名稱 String topic = entry.getKey(); System.out.println(topic); //TODO : 這個value我暫時也沒搞懂是啥,等空閒了在研究吧 Properties value = entry.getValue(); } } @Test //獲取所有topic,這種寫法更簡單一些 public void listTopicTwo() { ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled()); Seq<String> allTopics = zkUtils.getAllTopics(); List<String> topicsList = JavaConversions.seqAsJavaList(allTopics); //TODO: 這個寫法我也沒搞懂,同上 topicsList.forEach(System.out::println); } @Test //修改topic分割槽數 public void alterTopicPartition() { ZkUtils zkUtils = ZkUtils.apply(hostAndPort, 30000, 30000, JaasUtils.isZkSecurityEnabled()); List<String> topics = JavaConversions.seqAsJavaList(zkUtils.getAllTopics()); for (String topic : topics) { AdminUtils.addPartitions(zkUtils, topic, 4, "", true, RackAwareMode.Enforced$.MODULE$); } zkUtils.close(); } }

遇到的問題

執行之後發現報瞭如下錯誤:

org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids

	at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:1000)
	at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:676)
	at org.I0Itec.zkclient.ZkClient.getChildren(ZkClient.java:672)
	at kafka.utils.ZkUtils.getChildrenParentMayNotExist(ZkUtils.scala:568)
	at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:161)
	at kafka.admin.AdminUtils$.getBrokerMetadatas(AdminUtils.scala:380)
	at kafka.admin.AdminUtils$.createTopic(AdminUtils.scala:402)
	at kafka.admin.AdminUtils.createTopic(AdminUtils.scala)
	at com.bicon.kafka.TopicSh.createTopic(TopicSh.java:26)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
	at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
	at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
	at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
	at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
	at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
	at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
	at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
	at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
	at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:119)
	at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:42)
	at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:234)
	at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:74)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: org.apache.zookeeper.KeeperException$NoAuthException: KeeperErrorCode = NoAuth for /brokers/ids
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:113)
	at org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
	at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1472)
	at org.apache.zookeeper.ZooKeeper.getChildren(ZooKeeper.java:1500)
	at org.I0Itec.zkclient.ZkConnection.getChildren(ZkConnection.java:119)
	at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:679)
	at org.I0Itec.zkclient.ZkClient$4.call(ZkClient.java:676)
	at org.I0Itec.zkclient.ZkClient.retryUntilConnected(ZkClient.java:990)
	... 35 more

錯誤顯示沒有認證,一開始我推測可能是檔案許可權的問題
於是我看了一下zookeeper的datadir目錄

[[email protected] zookeeper]# ll
總用量 4
drwxr-xr-x 2 root root 4096 12月  7 09:41 version-2

發現非root使用者果然沒有寫的許可權,但是我又覺得不會是這個問題,然後我嘗試百度了一下,發現好像還真不是這個原因。
原來zookeeper會設定acl,登入zookeeper命令列檢視

[[email protected] kafka_2.11-0.11.0.1]# ./bin/zookeeper-shell.sh 28.2.5.89:2181
Connecting to 28.2.5.89:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /
[cluster, controller, brokers, zookeeper, kafka-acl, kafka-acl-changes, admin, isr_change_notification, controller_epoch, kafka-manager, consumers, latest_producer_id_block, config]
getAcl /brokers/ids
'ip,'10.1.24.216
: cdrwa

果然被誰設定了許可權。。於是修改許可權

setAcl /brokers/ids world:anyone:cdrwa
cZxid = 0x5
ctime = Wed Dec 06 20:17:05 CST 2017
mZxid = 0x5
mtime = Wed Dec 06 20:17:05 CST 2017
pZxid = 0x1fd7
cversion = 63
dataVersion = 0
aclVersion = 2
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
getAcl /brokers/ids
'world,'anyone
: cdrwa

在此執行程式,成功!

注:因為我這裡是測試環境,所以隨便修改許可權配置沒啥問題,不過大家還是別學我

最後貼上提醒了我的文章 https://my.oschina.net/anxiaole/blog/1814143