Storm Kafka Integration (0.10.x+)官方文件翻譯:storm與kafka整合
Storm Kafka Integration (0.10.x+)
相容性
Apache Kafka版本0.10以上
向kafka寫資料作為拓撲的一部分
你可以建立一個org.apache.storm.kafka.bolt.KafkaBolt
的例項,並將其作為一個元件新增到你的拓撲上,或者如果你正在使用trident
你可以使用
org.apache.storm.kafka.trident.TridentState
, org.apache.storm.kafka.trident.TridentStateFactory
和org.apache.storm.kafka.trident.TridentKafkaUpdater
你需要實現下面兩個介面:
TupleToKafkaMapper
和 TridentTupleToKafkaMapper
這些介面有兩個方法定義:
K getKeyFromTuple(Tuple/TridentTuple tuple);
V getMessageFromTuple(Tuple/TridentTuple tuple);
顧名思義,這些方法被呼叫對映一個tuple到一個kafka key和kafka message。
如果你只需要一個欄位作為key,一個欄位作為value,那麼你可以使用提供的FieldNameBasedTupleToKafkaMapper.java
- 在
KafkaBolt
裡,如果使用預設建構函式構造FieldNameBasedTupleToKafkaMapper
,則實現始終會查詢具有欄位名稱“key”和“message”的欄位,以實現向後相容性的原因。
或者,您也可以使用非預設建構函式指定不同的key和message欄位。 - 在
TridentKafkaState
中,您必須指定key和message的欄位名稱,因為沒有預設建構函式。
在構造FieldNameBasedTupleToKafkaMapper
的例項時應指定這些。
KafkaTopicSelector
和trident KafkaTopicSelector
這個介面只有一個方法
public interface KafkaTopicSelector {
String getTopics(Tuple/TridentTuple tuple);
}
這個介面的實現應該返回要釋出tuple的key/message對映的主題。您可以返回一個null,該訊息將被忽略。如果你有一個靜態主題名稱,那麼你可以
使用DefaultTopicSelector.java並在建構函式中設定主題的名稱。
FieldNameTopicSelector 和 FieldIndexTopicSelector可以被使用來選擇一個topic應該去釋出一個tuple到哪。(select the topic should to publish a tuple to.)
使用者只需要在tuple本身中指定topic名稱的欄位名稱或欄位索引。
當topic名稱未找到時,Field * TopicSelector將會將訊息寫入預設topic。請保證預設的topic已經被建立。
指定Kafka生產者屬性
您可以通過呼叫KafkaBolt.withProducerProperties()
和TridentKafkaStateFactory.withProducerProperties()
來提供Storm拓撲中的所有生產者屬性。
生產者的重要的配置屬性包括:
- metadata.broker.list
- request.required.acks
- producer.type
- serializer.class
這些也被定義在org.apache.kafka.clients.producer.ProducerConfig
裡
使用萬用字元topic匹配(Using wildcard kafka topic match)
您可以通過新增以下配置來進行萬用字元主題匹配
Config config = new Config(); config.put("kafka.topic.wildcard.match",true);
之後你可以制定一個萬用字元主題去匹配。例如clickstream.*.log. 這將會匹配如下所有的流 clickstream.my.log, clickstream.cart.log 等等。
綜上:
for the bolt:
TopologyBuilder builder = new TopologyBuilder();
Fields fields = new Fields("key", "message");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
builder.setSpout("spout", spout, 5);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaBolt bolt = new KafkaBolt()
.withProducerProperties(props)
.withTopicSelector(new DefaultTopicSelector("test"))
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");
Config conf = new Config();
StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());
For Trident:
Fields fields = new Fields("word", "count");
FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
new Values("storm", "1"),
new Values("trident", "1"),
new Values("needs", "1"),
new Values("javadoc", "1")
);
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
Stream stream = topology.newStream("spout1", spout);
//set producer properties.
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "1");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
.withProducerProperties(props)
.withKafkaTopicSelector(new DefaultTopicSelector("test"))
.withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());
Config conf = new Config();
StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());
從kafka讀資料(Spouts)
配置
spout的實現使用KafkaSpoutConfig
類來配置。這個類使用Build模式,可以通過呼叫其中一個Builds建構函式或通過呼叫KafkaSpoutConfig
類中的靜態方法構建器來啟動。
建立構建器的建構函式或靜態方法需要幾個key values(稍後可以更改),但是啟動一個spout所需的最小配置。
bootstrapServers
與Kafka Consumer Property “bootstrap.servers”是相同的。
spout將消耗的主題可以是特定topic名稱(1個或更多)的集合或正則表示式Pattern,它指定匹配該正則表示式的任何主題將被使用。
在建構函式的情況下,您可能還需要指定鍵解串器和值解串器。這是為了通過使用Java泛型來保證型別安全。預設值為StringDeserializer
,
可以通過呼叫setKeyDeserializer
和/或 setValueDeserializer
來覆蓋。
如果這些設定為null,程式碼將回退到kafka屬性中設定的內容,但是最好在這裡明確表示,再次使用泛型來維護型別安全性。
有幾個關鍵配置要注意
setFirstPollOffsetStrategy
:允許您設定從哪裡開始使用資料.這在故障恢復和首次啟動spout的情況下都被使用。允許的值包括:
EARLIEST
:無論以前的提交如何,kafka spout會輪詢從分割槽的第一個偏移開始的記錄LATEST
:無論以前的提交如何,kafka spout輪詢(具有大於分割槽中最後一個偏移量的)偏移量的記錄UNCOMMITTED_EARLIEST
(預設):kafka spout從最後提交的偏移量(如果有的話)中輪詢記錄,如果沒有提交任何偏移量,則表現為EARLIEST
。UNCOMMITTED_LATEST
:kafka spout從最後提交的偏移量(如果有的話)中輪詢記錄,如果沒有提交任何偏移量,則表現為LATEST
。
setRecordTranslator
:允許您修改spout如何將Kafka Consume Record轉換為tuple,以及將釋出該元組的流。
預設情況下,”topic”, “partition”, “offset”, “key” 和 “value”將被髮送到“預設”流。
如果要根據主題將條目輸出到不同的流,則storm提供ByTopicRecordTranslator
。有關如何使用這些的更多示例,請參見下文。setProp
:可以用來設定沒有方便方法的kafka屬性。setGroupId
:讓你設定kafka消費者組屬性“group.id”的id。setSSLKeystore
和setSSLTruststore
:允許您配置SSL身份驗證。
用法示例
API是用java 8 lambda表示式寫的,它可以與java7及以下版本配合使用。
建立一個簡單的不安全Spout
以下將消費所有的釋出到“topic”的事件,併發送他們到
MyBolt
有著欄位”topic”, “partition”, “offset”, “key”, “value”。
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
萬用字元topics
萬用字元主題將從指定代理列表中存在的所有主題消耗,並匹配該模式。所以在下面的例子中,”topic”, “topic_foo” and “topic_bar”將都會匹配”topic.*”,
但是”not_my_topic”不匹配。
final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
Multiple Streams(多個流)
這使用了java 8 lambda表示式。
final TopologyBuilder tp = new TopologyBuilder();
//By default all topics not covered by another rule, but consumed by the spout will be emitted to "STREAM_1" as "topic", "key", and "value"
//預設情況下,所有未被其他規則覆蓋的主題,但是由spout消耗的所有主題將被髮送到“STREAM_1”作為"topic", "key", and "value"
ByTopicRecordTranslator byTopic = new ByTopicRecordTranslator<>( (r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "value"), "STREAM_1");
//For topic_2 all events will be emitted to "STREAM_2" as just "key" and "value"
byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2");
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1");
tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
Trident
final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
.parallelismHint(1)
Trident不支援多個流,並將忽略為輸出設定的任何流。然而,如果每個輸出主題的欄位不相同,它將丟擲異常,而不會繼續。
Custom RecordTranslators(自定義RecordTranslators)(高階)
在大多數情況下,內建的SimpleRecordTranslator和ByTopicRecordTranslator應該能覆蓋您的用例。
如果您遇到需要定製的情況,則本文件將介紹如何正確執行此操作,以及一些不太明顯的類。
適用的要點是使用ConsumerRecord並將其轉換為可以發出的List 。不明顯的是如何告訴spout將其發射到特定的流。為此,
您將需要返回一個org.apache.storm.kafka.spout.KafkaTuple
的例項。這提供了一個routedTo
方法,它將說明tuple應該去哪個特定的流。
例如:return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
Will cause the tuple to be emitted on the “bar” stream(將導致元組在“bar”流中發出)
在編寫自定義record translators時要小心,因為就像在storm spout中,它需要自我一致。stream
方法應該返回一組完整的流,這個轉換器將會嘗試傳送。
另外getFieldsFor
應為每個流返回一個有效的Fields
物件。
如果您正在為Trident執行此操作,則值必須位於通過應用該流的Fields物件中的每個欄位返回的List中,否則trident可能會丟擲異常。
手動分割槽控制(高階)
預設情況下,Kafka將自動將分割槽分配給當前的一組分支。它處理很多事情,但在某些情況下,您可能需要手動分配分割槽。
當spout go down 並重新啟動時,這可能會導致更少的churn(攪拌),但是如果沒有完成,可能會導致很多問題。
這都可以通過子類化Subscription來處理,我們有幾個實現,您可以檢視有關如何執行此操作的示例。
ManualPartitionNamedSubscription
和 ManualPartitionPatternSubscription
。請使用這些或實現你自己的時候要小心。