1. 程式人生 > >spring boot 集成kafka (多線程,消費者使用kafka的原生api實現,因為@KakfkaListener修改groupId無效)

spring boot 集成kafka (多線程,消費者使用kafka的原生api實現,因為@KakfkaListener修改groupId無效)

初始化 接收 .bat truct singleton test ops cati xtend

application-test.properties

 1 #kafka
 2 kafka.consumer.zookeeper.connect=*:2181
 3 kafka.consumer.servers=*:9092
 4 kafka.consumer.enable.auto.commit=true
 5 kafka.consumer.session.timeout=6000
 6 kafka.consumer.auto.commit.interval=1000
 7 #保證每個組一個消費者消費同一條消息,若設置為earliest,那麽會從頭開始讀partition(none)
 8 kafka.consumer.auto.offset.reset=latest
9 kafka.consumer.concurrency=10 10 11 kafka.producer.servers=*:9092 12 kafka.producer.retries=0 13 kafka.producer.batch.size=4096 14 #//往kafka服務器提交消息間隔時間,0則立即提交不等待 15 kafka.producer.linger=1 16 kafka.producer.buffer.memory=40960

啟動類

@SpringBootApplication
@EnableScheduling
public class Application {

    @Autowired
    
private KafkaSender kafkaSender; public static void main(String[] args) { SpringApplication.run(Application .class, args); } //然後每隔1分鐘執行一次 @Scheduled(fixedRate = 1000 * 60) public void testKafka() throws Exception { kafkaSender.sendTest(); } }

生產者:

 1 @Component
2 public class KafkaSender { 3 4 @Resource 5 KafkaConsumerPool consumerPool; 6 7 /** 8 * 這裏需要放到程序啟動完成之後執行 TODO 9 */ 10 @PostConstruct 11 void d(){ 12 13 ConsumerGroup consumerThread = new ConsumerGroup("gropu-1","access_data",consumerConfig); 14 ConsumerGroup consumerThread2 = new ConsumerGroup("gropu-2","access_data", consumerConfig); 15 16 /** 17 * 各起兩個消費者 ,Kafka consumer是非線程安全的 Consumer 需要一個new 的 18 */ 19 consumerPool.SubmitConsumerPool(new Consumer(consumerThread)); 20 consumerPool.SubmitConsumerPool(new Consumer(consumerThread)); 21 22 consumerPool.SubmitConsumerPool(new Consumer(consumerThread2)); 23 consumerPool.SubmitConsumerPool(new Consumer(consumerThread2)); 24 } 25 26 27 @Resource 28 KafkaConsumerConfig consumerConfig; 29 30 @Autowired 31 private KafkaTemplate kafkaTemplate; 32 33 @Autowired 34 private KafkaTopics kafkaTopics; 35 36 /** 37 * 發送消息到kafka 38 * 39 */ 40 public void sendTest() throws InterruptedException, IOException, KeeperException { 41 42 /** 43 * topic=‘access_data‘ 44 */ 45 kafkaTemplate.send("access_data",""+ System.currentTimeMillis()); 46 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 47 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 48 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 49 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 50 kafkaTemplate.send("access_data",""+System.currentTimeMillis()); 51 } 52 53 54 }
KafkaProducerConfig
@Configuration
@EnableKafka
public class KafkaProducerConfig {

    @Value("${kafka.producer.servers}")
    private String servers;
    @Value("${kafka.producer.retries}")
    private int retries;
    @Value("${kafka.producer.batch.size}")
    private int batchSize;
    @Value("${kafka.producer.linger}")
    private int linger;
    @Value("${kafka.producer.buffer.memory}")
    private int bufferMemory;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        props.put(ProducerConfig.RETRIES_CONFIG, retries);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
        props.put(ProducerConfig.LINGER_MS_CONFIG, linger);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<String, String>(producerFactory());
    }
}
KafkaConsumerConfig
 1 @Configuration
 2 @EnableKafka
 3 public class KafkaConsumerConfig {
 4 
 5     @Value("${kafka.consumer.zookeeper.connect}")
 6     public String zookeeperConnect;
 7     @Value("${kafka.consumer.servers}")
 8     public  String servers;
 9     @Value("${kafka.consumer.enable.auto.commit}")
10     public  boolean enableAutoCommit;
11     @Value("${kafka.consumer.session.timeout}")
12     public  String sessionTimeout;
13     @Value("${kafka.consumer.auto.commit.interval}")
14     public  String autoCommitInterval;
15     @Value("${kafka.consumer.auto.offset.reset}")
16     public  String autoOffsetReset;
17     @Value("${kafka.consumer.concurrency}")
18     public  int concurrency;
19 
20 
21     public String getZookeeperConnect() {
22         return zookeeperConnect;
23     }
24 
25     public void setZookeeperConnect(String zookeeperConnect) {
26         this.zookeeperConnect = zookeeperConnect;
27     }
28 
29     public String getServers() {
30         return servers;
31     }
32 
33     public void setServers(String servers) {
34         this.servers = servers;
35     }
36 
37     public boolean isEnableAutoCommit() {
38         return enableAutoCommit;
39     }
40 
41     public void setEnableAutoCommit(boolean enableAutoCommit) {
42         this.enableAutoCommit = enableAutoCommit;
43     }
44 
45     public String getSessionTimeout() {
46         return sessionTimeout;
47     }
48 
49     public void setSessionTimeout(String sessionTimeout) {
50         this.sessionTimeout = sessionTimeout;
51     }
52 
53     public String getAutoCommitInterval() {
54         return autoCommitInterval;
55     }
56 
57     public void setAutoCommitInterval(String autoCommitInterval) {
58         this.autoCommitInterval = autoCommitInterval;
59     }
60 
61     public String getAutoOffsetReset() {
62         return autoOffsetReset;
63     }
64 
65     public void setAutoOffsetReset(String autoOffsetReset) {
66         this.autoOffsetReset = autoOffsetReset;
67     }
68 
69     public int getConcurrency() {
70         return concurrency;
71     }
72 
73     public void setConcurrency(int concurrency) {
74         this.concurrency = concurrency;
75     }
76 }
Consumer
/**
 * 實際消費者,繼承了ShutdownableThread ,要多加幾個消費者直接繼承實現即可
 *
 * @create 2017-11-06 12:42
 * @update 2017-11-06 12:42
 **/
public class Consumer extends ShutdownableThread {

    /**
     * kafka 消費者
     */
    private  KafkaConsumer consumer;

    /**
     *  topic
     */
    private  String topic;

    /**
     *  組id
     */
    private  String groupId;


    public Consumer(ConsumerGroup consumerGroup) {
        super("",false);
        this.consumer = consumerGroup.getConsumer();
        this.topic = consumerGroup.getTopic();
        this.groupId = consumerGroup.getA_groupId();
    }

    /**
     *  * 監聽主題,有消息就讀取
     * 從kafka裏面得到數據後,具體怎麽去處理. 如果需要開啟kafka處理消息的廣播模式,多個監聽要監聽不同的group,
     * 即方法上的註解@KafkaListener裏的group一定要不一樣.如果多個監聽裏的group寫的一樣,就會造成只有一個監聽能處理其中的消息,
     * 另外監聽就不能處理消息了.也即是kafka的分布式消息處理方式.
     * 在同一個group裏的監聽,共同處理接收到的消息,會根據一定的算法來處理.如果不在一個組,但是監聽的是同一個topic的話,就會形成廣播模式
     */
    @Override
    public void doWork() {
        consumer.subscribe(Collections.singletonList(this.topic));
        ConsumerRecords<Integer, String> records = consumer.poll(1000);
        for (ConsumerRecord<Integer, String> record : records) {
            System.out.println("Thread: "+Thread.currentThread().getName()
                    +"Received message: (" + this.groupId + ", " + record.value() + ") at offset "
                    + record.offset()+" partition : "+records.partitions());
        }
    }
}
ConsumerGroup 設置消費組
 1 public class ConsumerGroup  {
 2 
 3     /**
 4      *  日誌處理
 5      */
 6     private static final Log log = LogFactory.getLog(ConsumerGroup.class);
 7 
 8     /**
 9      *  topic
10      */
11     private final String topic;
12 
13     /**
14      *  公共連接屬性
15      */
16     private  Properties props ;
17 
18     /**
19      * 消費者組
20      */
21     private final String groupId;
22 
23 
24     public ConsumerGroup(String groupId, String topic, KafkaConsumerConfig consumerConfig) {
25         createConsumerConfig(groupId,consumerConfig);
26         this.topic = topic;
27         this.groupId = groupId;
28     }
29 
30 
31     private Properties createConsumerConfig(String groupId, KafkaConsumerConfig consumerConfig) {
32         props = new Properties();
33         props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,consumerConfig.servers);
34         props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
35         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, consumerConfig.enableAutoCommit);
36         props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, consumerConfig.autoCommitInterval);
37         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, consumerConfig.sessionTimeout);
38         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.IntegerDeserializer");
39         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
40         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, consumerConfig.autoOffsetReset);
41         // 其他配置再配置
42         return props;
43     }
44 
45     public KafkaConsumer getConsumer() {
46         return new KafkaConsumer(props);
47     }
48 
49     /**
50      *  其他類獲取topic
51      * @return
52      */
53     public String getTopic() {
54         return topic;
55     }
56 
57     public String getA_groupId() {
58         return groupId;
59     }
60 }
 1 @Component
 2 public class KafkaConsumerPool {
 3 
 4     /**
 5      * 日誌處理
 6      */
 7     private static final Log log = LogFactory.getLog(KafkaConsumerPool.class);
 8 
 9     /**
10      *  線程池
11      */
12     private ExecutorService executor;
13 
14     /**
15      * 初始化10個線程
16      */
17     @PostConstruct
18     void init(){
19         executor = Executors.newFixedThreadPool(10);
20     }
21 
22     /**
23      * 提交新的消費者
24      *
25      * @param shutdownableThread
26      */
27     public void SubmitConsumerPool(ShutdownableThread shutdownableThread) {
28         executor.execute(shutdownableThread);
29     }
30 
31     /**
32      * 程序關閉,關閉線程池
33      *
34      */
35     @PreDestroy
36     void fin(){
37         shutdown();
38     }
39 
40     public void shutdown() {
41         if (executor != null) executor.shutdown();
42         try {
43             if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
44                 log.info("Timed out waiting for consumer threads to shut down, exiting uncleanly");
45             }
46         } catch (InterruptedException e) {
47             log.info("Interrupted during shutdown, exiting uncleanly");
48         }
49     }
50 }

spring boot 集成kafka (多線程,消費者使用kafka的原生api實現,因為@KakfkaListener修改groupId無效)