1. 程式人生 > >Kafka 獲取N條訊息

Kafka 獲取N條訊息

解決方案寫在前面:./kafka-console-consumer.sh –topic xxxxxx –bootstrap-server localhost:9092 –max-messages 10 –from-beginning

業務線有使用 Kafka,有時想從 Kafka 里弄些資料出來做測試,但是 --from-beginning 就有點太多了,同時滿屏刷,不太好,如果有類似於 SQL 的 limit 關鍵字就好了。最後是執行 ./kafka-console-consumer.sh

# ./kafka-console-consumer.sh
The console consumer is
a tool that reads data from Kafka and outputs it to standard output. Option Description ------ ----------- --blacklist <blacklist> Blacklist of topics to exclude from
consumption. --bootstrap-server <server to connect REQUIRED (unless old consumer is to> used): The server to connect to. --consumer-property <consumer_prop> A mechanism to
pass user-defined properties in the form key=value to the consumer. --consumer.config <config file> Consumer config properties file. Note that [consumer-property] takes precedence over this config. --csv-reporter-enabled If set, the CSV metrics reporter will be enabled --delete-consumer-offsets If specified, the consumer path in zookeeper is deleted when starting up --enable-systest-events Log lifecycle events of the consumer in addition to logging consumed messages. (This is specific for system tests.) --formatter <class> The name of a class to use for formatting kafka messages for display. (default: kafka.tools. DefaultMessageFormatter) --from-beginning If the consumer does not already have an established offset to consume from, start with the earliest message present in the log rather than the latest message. --key-deserializer <deserializer for key> --max-messages <Integer: num_messages> The maximum number of messages to consume before exiting. If not set, consumption is continual. --metrics-dir <metrics directory> If csv-reporter-enable is set, and this parameter isset, the csv metrics will be outputed here --new-consumer Use the new consumer implementation. This is the default. --offset <consume offset> The offset id to consume from (a non- negative number), or 'earliest' which means from beginning, or 'latest' which means from end (default: latest) --partition <Integer: partition> The partition to consume from. --property <prop> The properties to initialize the message formatter. --skip-message-on-error If there is an error when processing a message, skip it instead of halt. --timeout-ms <Integer: timeout_ms> If specified, exit if no message is available for consumption for the specified interval. --topic <topic> The topic id to consume on. --value-deserializer <deserializer for values> --whitelist <whitelist> Whitelist of topics to include for consumption. --zookeeper <urls> REQUIRED (only when using old consumer): The connection string for the zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over.

注意到裡面有個 –max-messages,會只消費 N 條資料,如果配合 --from-beginning,就會消費最早 N 條資料。之後就可以愉快地玩耍了。
如果是消費之後到達的 N 條資訊後退出,不需要加 --from-beginning 就可以了。
路漫漫其修遠兮,慢慢加油!好記性不如爛鍵盤!