kafka與storm整合
阿新 • • 發佈:2018-11-28
kafka與storm的整合步驟
採用官方storm-kafka-client方式進行整合
一 引入pom依賴
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<storm.version>1.1.1</storm.version>
<storm.kafka.client.version>1.1.1</storm.kafka.client.version >
<kafka.version>0.10.0.1</kafka.version>
<kafka.client.version>0.10.0.1</kafka.client.version>
<jdk.version>1.8</jdk.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.storm</groupId >
<artifactId>storm-core</artifactId>
<version>${storm.version}</version>
<!--<scope>provided</scope>--><!--本地模式需要將<scope>provided</scope> 遮蔽掉-->
</dependency>
<dependency>
<groupId >org.apache.storm</groupId>
<artifactId>storm-kafka-client</artifactId>
<version>${storm.kafka.client.version}</version>
</dependency>
<!--如果需要更改kafka客戶端的版本,額可以通過此依賴進行定義-->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.client.version}</version>
</dependency>
</dependencies>
二 從kafka獲取資料
通過KafkaSpout進行接入資料,注意新版本配置必須通過kafka:9092埠去獲取資料,而不是通過zk去請求資料
2.1 配置
//KafkaSpout在將資料解析的時候 預設封裝5個元資料分別為:"topic","partition","offset","key","value":value即為kafka中的資料
builder.setSpout("kafkaSpout", new KafkaSpout<>(CollectorKafkaSpoutConfig.getKafkaSpoutConfig()), 1);
public static KafkaSpoutConfig getKafkaSpoutConfig(){
Properties props = new Properties();
props.put("group.id", "消費組GROUP_ID");
//props.put("auto.offset.reset", "earliest");
return KafkaSpoutConfig
.builder("127.0.0.1:9092,10.0.0.1:9092", "kafka Topic")
.setProp(props)
.setKey(ByteArrayDeserializer.class)//反序列話配置
.setValue(ByteArrayDeserializer.class)//反序列化配置
.build();
}
三 往kafka輸出資料
通過kakfaBolt進行操作
3.1 配置
KafkaBolt kafkaBolt = new KafkaBolt()
.withProducerProperties(KafkaBoltConfig.getKafkaBoltConfig())
.withTopicSelector(new DefaultTopicSelector("kafkaTopicName"))//設定kafka主題
.withTupleToKafkaMapper(new FieldNameBasedTupleToKafkaMapper<>("kafkaKeyName", "kafkaValue"));//將資料對映到kafka訊息佇列key=>value,注意:kafkaValue是bolt中的outputFieldsDeclarer.declare(new Fields("kafkaValue"));定義的值
builder.setBolt("kafkaOutBolt", kafkaBolt).shuffleGrouping("bolt資料");
四 參考
https://github.com/apache/storm/blob/master/docs/storm-kafka-client.md