1. 程式人生 > >分散式訊息系統:Kafka(九)應用Spring Boot實現消費者和生產者

分散式訊息系統:Kafka(九)應用Spring Boot實現消費者和生產者

一、專案

(1)新建Spring Boot專案,參考以下建立過程;
建立一個Spring Boot專案
(2)pom檔案中新增spring-kafka框架

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

二、生產者

核心類:

@Service
public class KafkaService {

    @Autowired
private KafkaTemplate<String, String> kafkaTemplate; public void sendKafkaMsg(String topic, String data) { kafkaTemplate.send(topic, data); } }

就這樣,傳送訊息程式碼就實現了。

這裡關鍵的程式碼為 kafkaTemplate.send() 方法,傳入的值為topic(主題)和要傳送的資料data;Kafka 裡的生產者這個topic 在 Java 程式中是不需要提前在 Kafka 中設定的,因為它會在傳送的時候自動建立你設定的 topic,data是訊息內容。

三、消費者

(1)properties檔案配置(也可以改成yml檔案進行配置)

server.port=8082

#============== kafka ===================#
kafka.consumer.zookeeper.connect=192.168.71.61:2181,192.168.71.62:2181,192.168.71.63:2181
kafka.consumer.servers=192.168.71.61:9092,192.168.71.62:9092,192.168.71.63:9092
kafka.consumer.enable.auto.commit
=true kafka.consumer.session.timeout=6000 kafka.consumer.auto.commit.interval=100 kafka.consumer.auto.offset.reset=latest kafka.consumer.topic=test kafka.consumer.group.id=test kafka.consumer.concurrency=10

在上面的配置中,我們給消費者分配的埠號是8082,伺服器有3臺,分別對應3個ip地址和埠。 並配置了kafka伺服器的ip地址;

kafka.consumer.enable.auto.commit=true //指定訊息被消費之後自動提交偏移量(即訊息的編號,表示消費到了哪個位置,消費者每消費完一條訊息就會向kafka伺服器彙報自己消消費到的那個訊息的編號,以便於下次繼續消費)。
kafka.consumer.group.id: applog //消費者組
kafka.consumer.auto.offset.reset: latest //從最近的地方開始消費
(2)定義kafka消費者配置類,並配置監聽器

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

/**
 * 
* @ClassName: KafkaConfig 
* @Description: TODO(定義kafka消費者配置類,並配置監聽器) 
* @author 
* @date 2018年5月22日 下午5:38:26 
*
 */
@Configuration
@EnableKafka
public class KafkaConfig {

    @Value("${kafka.consumer.servers}")
    private String servers;
    @Value("${kafka.consumer.enable.auto.commit}")
    private boolean enableAutoCommit;
    @Value("${kafka.consumer.session.timeout}")
    private String sessionTimeout;
    @Value("${kafka.consumer.auto.commit.interval}")
    private String autoCommitInterval;
    @Value("${kafka.consumer.group.id}")
    private String groupId;
    @Value("${kafka.consumer.auto.offset.reset}")
    private String autoOffsetReset;
    @Value("${kafka.consumer.concurrency}")
    private int concurrency;

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(concurrency);
        factory.getContainerProperties().setPollTimeout(1500);
        return factory;
    }

    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, enableAutoCommit);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, autoCommitInterval);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
        return propsMap;
    }

    @Bean
    public KafkaConsumer listener() {
        return new KafkaConsumer();
    }
}

(3)消費者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

import com.alibaba.fastjson.JSON;

public class KafkaConsumer {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = "${kafka.consumer.topic}", containerFactory = "kafkaListenerContainerFactory")
    public void listen(ConsumerRecord<String, String> record) {
        String topic = record.topic();
        String value = record.value();

        try {
            //System.out.println("kafka的key: " + record.key()+"kafka的value: " + record.value().toString());
            if(value != null){              
                KafkaMessage kafkaMessage = JSON.parseObject(value, KafkaMessage.class);    //將kafka裡的資料反序列化為實體     
            }
        } catch (Exception e) {
            logger.error("接收主題為"+topic+"的kafka的訊息時異常, 訊息:{}, 異常:{}", value, e);
        }
    }
}