訊息佇列(三)--與spring整合(採用註解消費佇列)
阿新 • • 發佈:2019-01-29
一、簡介
這裡採用註解方式使用kafka。
二、新增maven依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>RELEASE</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency>
三、消費者開發步驟
1、生產者配置
package com.dragon.study.kafka.producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Bean2、啟動生產者,傳送訊息; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory;import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka @ComponentScan("com.dragon.study.kafka.producer") @EnableAspectJAutoProxy public class KafkaProducerConfig { @Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
package com.dragon.study.kafka.producer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import org.springframework.kafka.core.KafkaTemplate; public class KafkaProducerMain { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaProducerConfig.class); KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) ctx.getBean("kafkaTemplate"); kafkaTemplate.send("my_topic_1", "msg-yes"); kafkaTemplate.flush(); } }四、消費者配置
1、消費者監聽
package com.dragon.study.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; @Component public class SimpleKafkaListener { @KafkaListener(topicPattern = "my_topic_1", groupId = "my_group_1") public void simplerListener(ConsumerRecord data){ System.out.println("receive msg :"+ data.value()); } }2、消費者配置
package com.dragon.study.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka @ComponentScan("com.dragon.study.kafka.consumer") @EnableAspectJAutoProxy public class KafkaConsumerConfig { @Bean public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(3); factory.getContainerProperties().setPollTimeout(3000); return factory; } @Bean public ConsumerFactory<String, String> consumerFactory(){ return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public Map<String, Object> consumerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.131.24.179:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } }3、啟動消費者
package com.dragon.study.kafka.consumer; import org.springframework.context.annotation.AnnotationConfigApplicationContext; import java.util.concurrent.TimeUnit; public class KafkaConsumerMain { public static void main(String[] args) throws Exception { AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConsumerConfig.class); TimeUnit.HOURS.sleep(1); } }