storm-kafka 入門
在資料密集型的系統架構中,一般會使用Kafka作為中間的緩衝,使用Storm做資料統計時,我們一般會從Kafka中消費訊息,然後把最終的統計結果儲存到Redis中,storm-kafka模組極大方便了我們在開發Spout時和Kafka的整合。
本文先簡單介紹下KafkaSpout,然後在前面 ofollow,noindex" target="_blank">WordCountRedisTopology 的基礎上引入storm-kafka。
KafkaSpout

在構造KafkaSpout的時候需要指定一個KafkaConfig。
public class KafkaConfig implements Serializable { private static final long serialVersionUID = 5276718734571623855L; public final BrokerHosts hosts; public final String topic; public final String clientId; public int fetchSizeBytes = 1024 * 1024; public int socketTimeoutMs = 10000; public int fetchMaxWait = FetchRequest.DefaultMaxWait(); public int bufferSizeBytes = 1024 * 1024; public MultiScheme scheme = new RawMultiScheme(); public boolean ignoreZkOffsets = false; public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime(); public long maxOffsetBehind = Long.MAX_VALUE; public boolean useStartOffsetTimeIfOffsetOutOfRange = true; public int metricsTimeBucketSizeInSecs = 60; public int minFetchByte = FetchRequest.DefaultMinBytes(); public KafkaConfig(BrokerHosts hosts, String topic) { this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId()); } public KafkaConfig(BrokerHosts hosts, String topic, String clientId) { this.hosts = hosts; this.topic = topic; this.clientId = clientId; } }
其中的BrokerHosts ,指定了Kafka Broker的地址。實現類ZkHosts表示Broker的地址從Kafka中動態獲取。

topic指定了要消費的主題,clientId用於ZK路徑中儲存KafkaSpout的偏移量。
項scheme用於配置從Kafka消費的ByteBuffer如何轉換為Storm裡面的Tuple,及控制Spout輸出欄位的命名,下面示例中會看到。
KafkaConfig子類SpoutConfig 提供了更多的控制,比如設定ZK的根路徑。
public class SpoutConfig extends KafkaConfig implements Serializable { public List<String> zkServers = null; public Integer zkPort = null; public String zkRoot = null; public String id = null; public String outputStreamId; // setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; // Retry strategy for failed messages public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName(); // Exponential back-off retry settings.These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used. public long retryInitialDelayMs = 0; public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; public int retryLimit = -1; /** * Create a SpoutConfig without setting client.id, which can make the source application ambiguous when tracing Kafka calls. */ public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; } /** * Create a SpoutConfig with a client.id value. */ public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id) { super(hosts, topic, clientId); this.zkRoot = zkRoot; this.id = id; } }
示例
主類中設定SpoutConfig及拓撲結構。
BrokerHosts zkrHosts = new ZkHosts(PropertiesUtils.getKafkaProp("kafka.zkhosts")); final String kafkaTopic = PropertiesUtils.getKafkaProp("kafka.topic"); final String zkRoot = "/" + kafkaTopic; final String spoutId = kafkaTopic; SpoutConfig kafkaConfig = new SpoutConfig(zkrHosts, kafkaTopic, zkRoot, spoutId); // default is byte arr kafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("spout", new KafkaSpout(kafkaConfig), 5); builder.setBolt("split", new SplitSentenceBolt2(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCountRedisBolt(jedisPoolConfig), 12).fieldsGrouping("split", new Fields("word")); Config conf = new Config(); conf.setDebug(true); if (args != null && args.length > 0) { conf.setNumWorkers(3); StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); }
注意,這裡自定義了配置中的scheme,這樣我們的tuple會作為一個字串傳遞,並且輸出欄位名是str。
public class StringScheme implements Scheme { private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8; public static final String STRING_SCHEME_KEY = "str"; public List<Object> deserialize(ByteBuffer bytes) { return new Values(deserializeString(bytes)); } public static String deserializeString(ByteBuffer string) { if (string.hasArray()) { int base = string.arrayOffset(); return new String(string.array(), base + string.position(), string.remaining()); } else { return new String(Utils.toByteArray(string), UTF8_CHARSET); } } public Fields getOutputFields() { return new Fields(STRING_SCHEME_KEY); } }
後續的Bolt就可以接到對應的Tuple。
public class SplitSentenceBolt2 extends BaseBasicBolt { @Override public void execute(Tuple input, BasicOutputCollector collector) { String sentence = input.getStringByField("str"); String[] words = sentence.split(" "); for (String w : words) { collector.emit(new Values(w)); } } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word")); } }
使用kafka命令列工具構造一個訊息。
➜kafka_2.11-2.0.0 ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test >x >hello >hello >kafka >kafka is what >reading is a good habit >kafka kafka broker partition >reading is a good habit >storm is coming >let it come storm >hello >hello
然後可以在redis中看到統計結果。
127.0.0.1:6379> scan 0 match cnt.* 1) "5" 2)1) "cnt.x" 2) "cnt.good" 3) "cnt.it" 4) "cnt.let" 5) "cnt.habit" 6) "cnt.partition" 7) "cnt.coming" 8) "cnt.what" 9) "cnt.is" 10) "cnt.kafka" 127.0.0.1:6379> get cnt.storm "2" 127.0.0.1:6379> get cnt.kafka "4" 127.0.0.1:6379>