1. 程式人生 > >Storm Kafka Integration (0.10.x+)官方文件翻譯:storm與kafka整合

Storm Kafka Integration (0.10.x+)官方文件翻譯:storm與kafka整合

Storm Kafka Integration (0.10.x+)

相容性

Apache Kafka版本0.10以上

向kafka寫資料作為拓撲的一部分

你可以建立一個org.apache.storm.kafka.bolt.KafkaBolt的例項,並將其作為一個元件新增到你的拓撲上,或者如果你正在使用trident你可以使用
org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory
org.apache.storm.kafka.trident.TridentKafkaUpdater

.

你需要實現下面兩個介面:

TupleToKafkaMapperTridentTupleToKafkaMapper

這些介面有兩個方法定義:

    K getKeyFromTuple(Tuple/TridentTuple tuple);
    V getMessageFromTuple(Tuple/TridentTuple tuple);

顧名思義,這些方法被呼叫對映一個tuple到一個kafka key和kafka message。
如果你只需要一個欄位作為key,一個欄位作為value,那麼你可以使用提供的FieldNameBasedTupleToKafkaMapper.java

實現。

  • KafkaBolt裡,如果使用預設建構函式構造FieldNameBasedTupleToKafkaMapper,則實現始終會查詢具有欄位名稱“key”和“message”的欄位,以實現向後相容性的原因。
    或者,您也可以使用非預設建構函式指定不同的key和message欄位。
  • TridentKafkaState中,您必須指定key和message的欄位名稱,因為沒有預設建構函式。
    在構造FieldNameBasedTupleToKafkaMapper的例項時應指定這些。

KafkaTopicSelectortrident KafkaTopicSelector

這個介面只有一個方法

    public interface KafkaTopicSelector { 
        String getTopics(Tuple/TridentTuple tuple); 
    } 

這個介面的實現應該返回要釋出tuple的key/message對映的主題。您可以返回一個null,該訊息將被忽略。如果你有一個靜態主題名稱,那麼你可以
使用DefaultTopicSelector.java並在建構函式中設定主題的名稱
FieldNameTopicSelector 和 FieldIndexTopicSelector可以被使用來選擇一個topic應該去釋出一個tuple到哪。(select the topic should to publish a tuple to.)
使用者只需要在tuple本身中指定topic名稱的欄位名稱或欄位索引。
當topic名稱未找到時,Field * TopicSelector將會將訊息寫入預設topic。請保證預設的topic已經被建立

指定Kafka生產者屬性

您可以通過呼叫KafkaBolt.withProducerProperties()TridentKafkaStateFactory.withProducerProperties()來提供Storm拓撲中的所有生產者屬性。
生產者的重要的配置屬性包括:

  • metadata.broker.list
  • request.required.acks
  • producer.type
  • serializer.class

這些也被定義在org.apache.kafka.clients.producer.ProducerConfig

使用萬用字元topic匹配(Using wildcard kafka topic match)

您可以通過新增以下配置來進行萬用字元主題匹配

Config config = new Config(); config.put("kafka.topic.wildcard.match",true);

之後你可以制定一個萬用字元主題去匹配。例如clickstream.*.log. 這將會匹配如下所有的流 clickstream.my.log, clickstream.cart.log 等等。

綜上:

for the bolt:

        TopologyBuilder builder = new TopologyBuilder();

        Fields fields = new Fields("key", "message");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                    new Values("storm", "1"),
                    new Values("trident", "1"),
                    new Values("needs", "1"),
                    new Values("javadoc", "1")
        );
        spout.setCycle(true);
        builder.setSpout("spout", spout, 5);
        //set producer properties.
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaBolt bolt = new KafkaBolt()
                .withProducerProperties(props)
                .withTopicSelector(new DefaultTopicSelector("test"))
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper());
        builder.setBolt("forwardToKafka", bolt, 8).shuffleGrouping("spout");

        Config conf = new Config();

        StormSubmitter.submitTopology("kafkaboltTest", conf, builder.createTopology());

For Trident:

        Fields fields = new Fields("word", "count");
        FixedBatchSpout spout = new FixedBatchSpout(fields, 4,
                new Values("storm", "1"),
                new Values("trident", "1"),
                new Values("needs", "1"),
                new Values("javadoc", "1")
        );
        spout.setCycle(true);

        TridentTopology topology = new TridentTopology();
        Stream stream = topology.newStream("spout1", spout);

        //set producer properties.
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("acks", "1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        TridentKafkaStateFactory stateFactory = new TridentKafkaStateFactory()
                .withProducerProperties(props)
                .withKafkaTopicSelector(new DefaultTopicSelector("test"))
                .withTridentTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper("word", "count"));
        stream.partitionPersist(stateFactory, fields, new TridentKafkaUpdater(), new Fields());

        Config conf = new Config();
        StormSubmitter.submitTopology("kafkaTridentTest", conf, topology.build());

從kafka讀資料(Spouts)

配置

spout的實現使用KafkaSpoutConfig類來配置。這個類使用Build模式,可以通過呼叫其中一個Builds建構函式或通過呼叫KafkaSpoutConfig類中的靜態方法構建器來啟動。
建立構建器的建構函式或靜態方法需要幾個key values(稍後可以更改),但是啟動一個spout所需的最小配置。
bootstrapServers與Kafka Consumer Property “bootstrap.servers”是相同的。
spout將消耗的主題可以是特定topic名稱(1個或更多)的集合或正則表示式Pattern,它指定匹配該正則表示式的任何主題將被使用。

在建構函式的情況下,您可能還需要指定鍵解串器和值解串器。這是為了通過使用Java泛型來保證型別安全。預設值為StringDeserializer
可以通過呼叫setKeyDeserializer 和/或 setValueDeserializer來覆蓋。
如果這些設定為null,程式碼將回退到kafka屬性中設定的內容,但是最好在這裡明確表示,再次使用泛型來維護型別安全性。

有幾個關鍵配置要注意

  • setFirstPollOffsetStrategy:允許您設定從哪裡開始使用資料.這在故障恢復和首次啟動spout的情況下都被使用。允許的值包括:
    • EARLIEST:無論以前的提交如何,kafka spout會輪詢從分割槽的第一個偏移開始的記錄
    • LATEST:無論以前的提交如何,kafka spout輪詢(具有大於分割槽中最後一個偏移量的)偏移量的記錄
    • UNCOMMITTED_EARLIEST(預設):kafka spout從最後提交的偏移量(如果有的話)中輪詢記錄,如果沒有提交任何偏移量,則表現為EARLIEST
    • UNCOMMITTED_LATEST:kafka spout從最後提交的偏移量(如果有的話)中輪詢記錄,如果沒有提交任何偏移量,則表現為LATEST
  • setRecordTranslator:允許您修改spout如何將Kafka Consume Record轉換為tuple,以及將釋出該元組的流。
    預設情況下,”topic”, “partition”, “offset”, “key” 和 “value”將被髮送到“預設”流。
    如果要根據主題將條目輸出到不同的流,則storm提供ByTopicRecordTranslator。有關如何使用這些的更多示例,請參見下文。
  • setProp:可以用來設定沒有方便方法的kafka屬性。
  • setGroupId:讓你設定kafka消費者組屬性“group.id”的id。
  • setSSLKeystore 和 setSSLTruststore:允許您配置SSL身份驗證。

用法示例

API是用java 8 lambda表示式寫的,它可以與java7及以下版本配合使用。

建立一個簡單的不安全Spout

以下將消費所有的釋出到“topic”的事件,併發送他們到MyBolt有著欄位”topic”, “partition”, “offset”, “key”, “value”。

final TopologyBuilder tp = new TopologyBuilder(); 
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic").build()), 1); 
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout"); 

萬用字元topics
萬用字元主題將從指定代理列表中存在的所有主題消耗,並匹配該模式。所以在下面的例子中,”topic”, “topic_foo” and “topic_bar”將都會匹配”topic.*”,
但是”not_my_topic”不匹配。

final TopologyBuilder tp = new TopologyBuilder();
tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout");
Multiple Streams(多個流)

這使用了java 8 lambda表示式。

final TopologyBuilder tp = new TopologyBuilder();
//By default all topics not covered by another rule, but consumed by the spout will be emitted to "STREAM_1" as "topic", "key", and "value" 
//預設情況下,所有未被其他規則覆蓋的主題,但是由spout消耗的所有主題將被髮送到“STREAM_1”作為"topic", "key", and "value"
ByTopicRecordTranslator byTopic = new ByTopicRecordTranslator<>( (r) -> new Values(r.topic(), r.key(), r.value()), new Fields("topic", "key", "value"), "STREAM_1"); 
//For topic_2 all events will be emitted to "STREAM_2" as just "key" and "value" 
byTopic.forTopic("topic_2", (r) -> new Values(r.key(), r.value()), new Fields("key", "value"), "STREAM_2");

tp.setSpout("kafka_spout", new KafkaSpout<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, "topic_1", "topic_2", "topic_3").build()), 1);
tp.setBolt("bolt", new myBolt()).shuffleGrouping("kafka_spout", "STREAM_1"); 
tp.setBolt("another", new myOtherBolt()).shuffleGrouping("kafka_spout", "STREAM_2");
Trident
final TridentTopology tridentTopology = new TridentTopology();
final Stream spoutStream = tridentTopology.newStream("kafkaSpout",
    new KafkaTridentSpoutOpaque<>(KafkaSpoutConfig.builder("127.0.0.1:" + port, Pattern.compile("topic.*")).build()))
      .parallelismHint(1)

Trident不支援多個流,並將忽略為輸出設定的任何流。然而,如果每個輸出主題的欄位不相同,它將丟擲異常,而不會繼續。

Custom RecordTranslators(自定義RecordTranslators)(高階)

在大多數情況下,內建的SimpleRecordTranslator和ByTopicRecordTranslator應該能覆蓋您的用例。
如果您遇到需要定製的情況,則本文件將介紹如何正確執行此操作,以及一些不太明顯的類。

適用的要點是使用ConsumerRecord並將其轉換為可以發出的List 。不明顯的是如何告訴spout將其發射到特定的流。為此,
您將需要返回一個org.apache.storm.kafka.spout.KafkaTuple的例項。這提供了一個routedTo方法,它將說明tuple應該去哪個特定的流。
例如:return new KafkaTuple(1, 2, 3, 4).routedTo("bar");
Will cause the tuple to be emitted on the “bar” stream(將導致元組在“bar”流中發出)

在編寫自定義record translators時要小心,因為就像在storm spout中,它需要自我一致。stream方法應該返回一組完整的流,這個轉換器將會嘗試傳送。

另外getFieldsFor應為每個流返回一個有效的Fields物件。
如果您正在為Trident執行此操作,則值必須位於通過應用該流的Fields物件中的每個欄位返回的List中,否則trident可能會丟擲異常。

手動分割槽控制(高階)

預設情況下,Kafka將自動將分割槽分配給當前的一組分支。它處理很多事情,但在某些情況下,您可能需要手動分配分割槽。

當spout go down 並重新啟動時,這可能會導致更少的churn(攪拌),但是如果沒有完成,可能會導致很多問題。
這都可以通過子類化Subscription來處理,我們有幾個實現,您可以檢視有關如何執行此操作的示例。
ManualPartitionNamedSubscriptionManualPartitionPatternSubscription。請使用這些或實現你自己的時候要小心。