1. 程式人生 > >訊息佇列(三)--與spring整合(採用註解消費佇列)

訊息佇列(三)--與spring整合(採用註解消費佇列)

一、簡介

這裡採用註解方式使用kafka。

二、新增maven依賴

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>RELEASE</version>
</dependency>
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>
kafka-clients</artifactId> <version>1.0.0</version> </dependency>

三、消費者開發步驟

1、生產者配置

package com.dragon.study.kafka.producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean
; import org.springframework.context.annotation.ComponentScan; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import java.util.HashMap; import java.util.Map; @Configuration @EnableKafka @ComponentScan("com.dragon.study.kafka.producer") @EnableAspectJAutoProxy public class KafkaProducerConfig { @Bean public KafkaTemplate<String, String> kafkaTemplate(){ return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, String> producerFactory(){ return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public Map<String, Object> producerConfigs(){ Map<String,Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } }
2、啟動生產者,傳送訊息
package com.dragon.study.kafka.producer;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.kafka.core.KafkaTemplate;
public class KafkaProducerMain {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaProducerConfig.class);
KafkaTemplate<String, String> kafkaTemplate = (KafkaTemplate<String, String>) ctx.getBean("kafkaTemplate");
kafkaTemplate.send("my_topic_1", "msg-yes");
kafkaTemplate.flush();
}
}
四、消費者配置

1、消費者監聽

package com.dragon.study.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class SimpleKafkaListener {

    @KafkaListener(topicPattern = "my_topic_1", groupId = "my_group_1")
    public void simplerListener(ConsumerRecord data){
        System.out.println("receive msg :"+ data.value());
}
}
2、消費者配置
package com.dragon.study.kafka.consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.EnableAspectJAutoProxy;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
@ComponentScan("com.dragon.study.kafka.consumer")
@EnableAspectJAutoProxy
public class KafkaConsumerConfig {

    @Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory(){
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
        return factory;
}


    @Bean
public ConsumerFactory<String, String> consumerFactory(){
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}

    @Bean
public Map<String, Object> consumerConfigs(){
        Map<String,Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.131.24.179:9092");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
}
}
3、啟動消費者
package com.dragon.study.kafka.consumer;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import java.util.concurrent.TimeUnit;
public class KafkaConsumerMain {
    public static void main(String[] args) throws Exception {
        AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(KafkaConsumerConfig.class);
TimeUnit.HOURS.sleep(1);
}
}