1. 程式人生 > >kafka與storm整合

kafka與storm整合

kafka與storm的整合步驟

採用官方storm-kafka-client方式進行整合

一 引入pom依賴

  <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <storm.version>1.1.1</storm.version>
        <storm.kafka.client.version>1.1.1</storm.kafka.client.version
>
<kafka.version>0.10.0.1</kafka.version> <kafka.client.version>0.10.0.1</kafka.client.version> <jdk.version>1.8</jdk.version> </properties> <dependencies> <dependency> <groupId>org.apache.storm</groupId
>
<artifactId>storm-core</artifactId> <version>${storm.version}</version> <!--<scope>provided</scope>--><!--本地模式需要將<scope>provided</scope> 遮蔽掉--> </dependency> <dependency> <groupId
>
org.apache.storm</groupId> <artifactId>storm-kafka-client</artifactId> <version>${storm.kafka.client.version}</version> </dependency> <!--如果需要更改kafka客戶端的版本,額可以通過此依賴進行定義--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.client.version}</version> </dependency> </dependencies>

二 從kafka獲取資料

通過KafkaSpout進行接入資料,注意新版本配置必須通過kafka:9092埠去獲取資料,而不是通過zk去請求資料
2.1 配置
 //KafkaSpout在將資料解析的時候  預設封裝5個元資料分別為:"topic","partition","offset","key","value":value即為kafka中的資料
builder.setSpout("kafkaSpout", new KafkaSpout<>(CollectorKafkaSpoutConfig.getKafkaSpoutConfig()), 1);

public static KafkaSpoutConfig getKafkaSpoutConfig(){

        Properties props = new Properties();

        props.put("group.id", "消費組GROUP_ID");
        //props.put("auto.offset.reset", "earliest");

        return KafkaSpoutConfig
                .builder("127.0.0.1:9092,10.0.0.1:9092", "kafka Topic")
                .setProp(props)
                .setKey(ByteArrayDeserializer.class)//反序列話配置
                .setValue(ByteArrayDeserializer.class)//反序列化配置
                .build();

    }

三 往kafka輸出資料

通過kakfaBolt進行操作
3.1 配置
 KafkaBolt kafkaBolt = new KafkaBolt()
                .withProducerProperties(KafkaBoltConfig.getKafkaBoltConfig())
                .withTopicSelector(new DefaultTopicSelector("kafkaTopicName"))//設定kafka主題
                .withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>("kafkaKeyName", "kafkaValue"));//將資料對映到kafka訊息佇列key=>value,注意:kafkaValue是bolt中的outputFieldsDeclarer.declare(new Fields("kafkaValue"));定義的值
builder.setBolt("kafkaOutBolt", kafkaBolt).shuffleGrouping("bolt資料");

四 參考

https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md