說說 MQ 之 Kafka(二)
Kafka 的工具和程式設計介面
Kafka 的工具
Kafka 提供的工具還是比較全的,bin/
目錄下的工具有以下一些,
bin/connect-distributed.shbin/kafka-consumer-offset-checker.shbin/kafka-replica-verification.shbin/kafka-verifiable-producer.sh bin/connect-standalone.shbin/kafka-consumer-perf-test.shbin/kafka-run-class.shbin/zookeeper-security-migration.sh bin/kafka-acls.shbin/kafka-mirror-maker.shbin/kafka-server-start.shbin/zookeeper-server-start.sh bin/kafka-configs.shbin/kafka-preferred-replica-election.shbin/kafka-server-stop.shbin/zookeeper-server-stop.sh bin/kafka-console-consumer.shbin/kafka-producer-perf-test.shbin/kafka-simple-consumer-shell.shbin/zookeeper-shell.sh bin/kafka-console-producer.shbin/kafka-reassign-partitions.shbin/kafka-topics.sh bin/kafka-consumer-groups.shbin/kafka-replay-log-producer.shbin/kafka-verifiable-consumer.sh
我常用的命令有以下幾個,
bin/kafka-server-start.sh -daemon config/server.properties & bin/kafka-topics.sh --describe --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --list --zookeeper 192.168.232.23:2181 bin/kafka-topics.sh --delete --zookeeper 192.168.232.23:2181 --topic topic1 bin/kafka-topics.sh --create --zookeeper 192.168.232.23:2181 --replication-factor 3 --partitions 2 --topic topic1 bin/kafka-console-consumer.sh --zookeeper 192.168.232.23:2181 --topic topic1 --from-beginning bin/kafka-console-producer.sh --broker-list 192.168.232.23:9092 --topic topic1
kafka-server-start.sh
是用於 Kafka 的 Broker 啟動的,主要就一個引數 config/server.properties
,該檔案中的配置項待會再說.還有一個 -daemon
引數,這個是將 Kafka 放在後臺用守護程序的方式執行,如果不加這個引數,Kafka 會在執行一段時間後自動退出,據說這個是 0.10.0.0 版本才有的問題 5。kafka-topics.sh
是用於管理 Topic 的工具,我主要用的 --describe
、--list
、--delete
、--create
這4個功能,上述的例子基本是不言自明的,--replication-factor 3
、--partitions 2
這兩個引數分別表示3個副本(含 Leader),和2個分割槽。kafka-console-consumer.sh
和 kafka-console-producer.sh
是生產者和消費者的簡易終端工具,在除錯的時候比較有用,我常用的是 kafka-console-consumer.sh
。我沒有用 Kafka 自帶的 zookeeper,而是用的 zookeeper 官方的釋出版本 3.4.8,埠是預設2181,與 Broker 在同一臺機器上。
下面說一下 Broker 啟動的配置檔案config/server.properties
,我在預設配置的基礎上,修改了以下一些,
broker.id=0 listeners=PLAINTEXT://192.168.232.23:9092 log.dirs=/tmp/kafka-logs delete.topic.enable=true
broker.id
是 Kafka 叢集中的 Broker ID,不可重複,我在多副本的實驗中,將他們分別設定為0、1、2;listeners
是 Broker 監聽的地址,預設是監聽 localhost:9092
,因為我不是單機實驗,所以修改為本機區域網地址,當然,如果要監聽所有地址的話,也可以設定為 0.0.0.0:9092
,多副本實驗中,將監聽埠分別設定為 9092、9093、9094;log.dirs
是 Broker 的 log 的目錄,多副本實驗中,不同的 Broker 需要有不同的 log 目錄;delete.topic.enable
設為 true 後,可以刪除 Topic,並且連帶 Topic 中的訊息也一併刪掉,否則,即使呼叫 kafka-topics.sh --delete
也無法刪除 Topic,這是一個便利性的設定,對於開發環境可以,生產環境一定要設為 false(預設)。實驗中發現, 如果有消費者在消費這個 Topic,那麼也無法刪除,還是比較安全的。
剩下的工具多數在文件中也有提到。如果看一下這些指令碼的話,會發現多數指令碼的寫法都是一致的,先做一些引數的校驗,最後執行exec $base_dir/kafka-run-class.sh XXXXXXXXX "$@"
,可見,這些工具都是使用執行 Java Class 的方式呼叫的。
Kafka 的 Java API
在程式設計介面方面,官方提供了 Scala 和 Java 的介面,社群提供了更多的其他語言的介面,基本上,無論用什麼語言開發,都能找到相應的 API。下面說一下 Java 的 API 介面。
生產者的 API 只有一種,相對比較簡單,程式碼如下,
import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import java.util.Properties; import java.util.concurrent.ExecutionException; public class SimpleProducerDemo { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.232.23:9092,192.168.232.23:9093,192.168.232.23:9094"); props.put("zookeeper.connect", "192.168.232.23:2181"); props.put("client.id", "DemoProducer"); props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<Integer, String> producer = new KafkaProducer<>(props); String topic = "topic1"; Boolean isAsync = false; int messageNo = 1; while (true) { String messageStr = "Message_" + String.format("%05d",messageNo); long startTime = System.currentTimeMillis(); if (isAsync) { // Send asynchronously producer.send(new ProducerRecord<>(topic, messageNo, messageStr), new DemoCallBack(startTime, messageNo, messageStr)); } else { // Send synchronously try { producer.send(new ProducerRecord<>(topic, messageNo, messageStr)).get(); System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } } try { Thread.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } ++messageNo; } } } class DemoCallBack implements Callback { private final long startTime; private final int key; private final String message; public DemoCallBack(long startTime, int key, String message) { this.startTime = startTime; this.key = key; this.message = message; } public void onCompletion(RecordMetadata metadata, Exception exception) { long elapsedTime = System.currentTimeMillis() - startTime; if (metadata != null) { System.out.println( "Sendmessage: (" + String.format("%05d",key) + ", " + message + ") at offset "+ metadata.offset() + " to partition(" + metadata.partition() + ") in " + elapsedTime + " ms"); } else { exception.printStackTrace(); } } }
上例中使用了同步和非同步傳送兩種方式。在多副本的情況下,如果要指定同步複製還是非同步複製,可以使用acks
引數,詳細參考官方文件 Producer Configs 部分的內容;在多分割槽的情況下,如果要指定傳送到哪個分割槽,可以使用 partitioner.class
引數,其值是一個實現了 org.apache.kafka.clients.producer.Partitioner
介面的類,用於根據不同的訊息指定分割槽6。消費者的 API 有幾種,比較新的 API 如下,
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class SimpleConsumer { public static void main(String[] args){ Properties props = new Properties(); props.put("bootstrap.servers", "192.168.232.23:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("topic1", "topic2", "topic3")); while (true) { ConsumerRecords<Integer, String> records = consumer.poll(100); for (ConsumerRecord<Integer, String> record : records) { System.out.println("Received message: (" + String.format("%05d", record.key()) + ", " + record.value() + ") at offset " + record.offset()); } } } }
消費者還有舊的 API,比如Consumer
和 SimpleConsumer
API,這些都可以從 Kafka 程式碼的 kafka-example 中找到,上述的兩個例子也是改寫自 kafka-example。使用新舊 API 在功能上都能滿足訊息收發的需要,但新 API 只依賴 kafka-clients
,打包出來的 jar 包會小很多,以我的測試,新 API 的消費者 jar 包大約有 2M 左右,而舊 API 的消費者 jar 包接近 16M。
其實,Kafka 也提供了按分割槽訂閱,可以一次訂閱多個分割槽TopicPartition[]
;也支援手動提交 offset,需要呼叫 consumer.commitSync
。
Kafka 似乎沒有公開 Topic 建立以及修改的 API(至少我沒有找到),如果生產者向 Broker 寫入的 Topic 是一個新 Topic,那麼 Broker 會建立這個 Topic。建立的過程中會使用預設引數,例如,分割槽個數,會使用 Broker 配置中的num.partitions
引數(預設1);副本個數,會使用 default.replication.factor
引數。但是通常情況下,我們會需要建立自定義的 Topic,那官方的途徑是使用 Kafka 的工具。也有一些非官方的途徑 7,例如可以這樣寫,
String[] options = new String[]{ "--create", "--zookeeper", "192.168.232.23:2181", "--partitions", "2", "--replication-factor", "3", "--topic", "topic1" }; TopicCommand.main(options);
但是這樣寫有一個問題,在執行完TopicCommand.main(options);
之後,系統會自動退出,原因是執行完指令之後,會呼叫 System.exit(exitCode);
系統直接退出。這樣當然不行,我的辦法是,把相關的執行程式碼挖出來,寫一個 TopicUtils 類,如下,
import joptsimple.OptionSpecBuilder; import kafka.admin.TopicCommand; import kafka.admin.TopicCommand$; import kafka.utils.ZkUtils; import org.apache.kafka.common.security.JaasUtils; import scala.runtime.Nothing$; public class TopicUtils { // from: http://blog.csdn.net/changong28/article/details/39325079 // from: http://www.cnblogs.com/davidwang456/p/4313784.html public static void createTopic(){ String[] options = new String[]{ "--create", "--zookeeper", KafkaProperties.ZOOKEEPER_URL, "--partitions", "2", "--replication-factor", "3", "--topic", KafkaProperties.TOPIC }; //TopicCommand.main(options); oper(options); } public static void listTopic(){ String[] options = new String[]{ "--list", "--zookeeper", KafkaProperties.ZOOKEEPER_URL }; //TopicCommand.main(options); oper(options); } public static void deleteTopic(){ String[] options = new String[]{ "--delete", "--zookeeper", KafkaProperties.ZOOKEEPER_URL, "--topic", KafkaProperties.TOPIC }; //TopicCommand.main(options); oper(options); } public static void describeTopic(){ String[] options = new String[]{ "--describe", "--zookeeper", KafkaProperties.ZOOKEEPER_URL, "--topic", KafkaProperties.TOPIC }; //TopicCommand.main(options); oper(options); } public static void main(String[] args){ listTopic(); createTopic(); listTopic(); describeTopic(); deleteTopic(); try { Thread.sleep(3*1000); } catch (InterruptedException e) { e.printStackTrace(); } listTopic(); } /** copied & modified from kafka.admin.TopicCommand$.main * * @param args */ public static void oper(String args[]){ try { TopicCommand$ topicCommand$ = TopicCommand$.MODULE$; final TopicCommand.TopicCommandOptions opts = new TopicCommand.TopicCommandOptions(args); if(args.length == 0) { throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Create, delete, describe, or change a topic."); } else { int actions =0; OptionSpecBuilder[] optionSpecBuilders = {opts.createOpt(), opts.listOpt(), opts.alterOpt(), opts.describeOpt(), opts.deleteOpt()}; for (OptionSpecBuilder temp:optionSpecBuilders){ if (opts.options().has(temp)) { actions++; } } if(actions != 1) { throw kafka.utils.CommandLineUtils$.MODULE$.printUsageAndDie(opts.parser(), "Command must include exactly one action: --list, --describe, --create, --alter or --delete"); } else { opts.checkArgs(); ZkUtils zkUtils = kafka.utils.ZkUtils$.MODULE$.apply((String)opts.options().valueOf(opts.zkConnectOpt()), 30000, 30000, JaasUtils.isZkSecurityEnabled()); byte exitCode = 0; try { try { if(opts.options().has(opts.createOpt())) { topicCommand$.createTopic(zkUtils, opts); } else if(opts.options().has(opts.alterOpt())) { topicCommand$.alterTopic(zkUtils, opts); } else if(opts.options().has(opts.listOpt())) { topicCommand$.listTopics(zkUtils, opts); } else if(opts.options().has(opts.describeOpt())) { topicCommand$.describeTopic(zkUtils, opts); } else if(opts.options().has(opts.deleteOpt())) { topicCommand$.deleteTopic(zkUtils, opts); } } catch (final Throwable var12) { scala.Predef$.MODULE$.println((new StringBuilder()).append("Error while executing topic command : ").append(var12.getMessage()).toString()); System.out.println(var12); exitCode = 1; return; } } finally { zkUtils.close(); //System.exit(exitCode); } } } } catch (Nothing$ nothing$) { nothing$.printStackTrace(); } } }
以上的oper
方法改寫自 kafka.admin.TopicCommand$.main
方法。可以發現這部分程式碼非常怪異,原因是 TopicCommand$
是 Scala 寫的,再編譯成 Java class 位元組碼,然後我根據這些位元組碼反編譯得到 Java 程式碼,並以此為基礎進行修改,等於是我在用 Java 的方式改寫 Scala 的程式碼,難免會覺得詭異。當然,這種寫法用在生產環境的話是不太合適的,因為呼叫的 topicCommand$.createTopic
等方法都沒有丟擲異常,例如引數不合法的情況,而且也沒有使用 log4j 之類的 log 庫,只是用 System.out.println
這樣的方法屏顯,在出現錯誤的時候,比較難以定位。
參考文章
- ofollow,noindex" target="_blank">http://kafka.apache.org/documentation.html ↩
- http://www.jianshu.com/p/453c6e7ff81c ↩
- http://www.infoq.com/cn/author/%E9%83%AD%E4%BF%8A#文章 ↩
- http://developer.51cto.com/art/201501/464491.htm ↩
- https://segmentfault.com/q/1010000004292925 ↩
- http://www.cnblogs.com/gnivor/p/5318319.html ↩
- http://www.cnblogs.com/davidwang456/p/4313784.html ↩
- http://www.jianshu.com/p/8689901720fd ↩
- http://zqhxuyuan.github.io/2016/05/26/2016-05-13-Kafka-Book-Sample/ ↩
- http://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ ↩