1. 程式人生 > >Spring Boot整合kafka筆記

Spring Boot整合kafka筆記

kafka官網 http://kafka.apache.org/quickstart

spring-kafka當前穩定版本是1.2.0..RELEASE  http://docs.spring.io/spring-kafka/docs/1.2.0.RELEASE/reference/html/_introduction.html

1.首先下載kafka,解壓縮,然後執行zookeeper和kafka

cd kafka_2.11-0.10.2.0

bin/zookeeper-server-start.sh config/zookeeper.properties

bin/kafka-server-start.sh config/server.properties

經過測試發現Java程式傳送訊息時自動建立了主題,所以不需要用命令單獨建立主題了(如果用命令列producer來繫結主題topic必須要先用命令列建立一個topic)

2.按照spring-kafka官網整合文件

1)pom中新增依賴

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.10.2.0</version>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>

2)配置,spring boot1.5.1可以直接在application.yml 配置檔案中配置,官方文件說只要配置兩個必要項就可以了

spring:
  kafka:
    consumer:
      group-id: foo
      auto-offset-reset: 
earliest
group-id是所屬分組(kafka要求一個分組的成員數量不能大於設定的分割槽數,否則同一個分組中多出的成員永遠都不會收到訊息),auto-offset-reset設為earliest是獲取以前就有過的訊息(其實不是必要的配置)

可以在配置檔案中進行更完整的配置,但是配置檔案中支援的預設配置項還是有限,如果需要更加完整的配置建議還是用bean配置

spring:
  kafka:
    producer:
      retries: 0
      batch-size: 16384
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      bootstrap-servers: localhost:9092
    consumer:
      bootstrap-servers: localhost:9092
      group-id: foo
      auto-offset-reset: earliest
      enable-auto-commit: true
      auto-commit-interval: 100
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.Java部分就是spring-kafka官網使用的程式碼貼上過來了,可以直接在application中同時執行producer和consumer

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.boot.web.support.SpringBootServletInitializer;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@SpringBootApplication
@Slf4j
public class MyApplication extends SpringBootServletInitializer implements CommandLineRunner {

   @Override
protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
      return application.sources(MyApplication.class);
}

   public static void main(String[] args) {
      SpringApplication.run(MyApplication.class, args);
}

   @Autowired
private KafkaTemplate<String, String> template;
   private final CountDownLatch latch = new CountDownLatch(4);
@Override
public void run(String... args) throws Exception {
      this.template.send("myTopic", "foo1");
      this.template.send("myTopic", "foo2");
      this.template.send("myTopic", "foo3");
      this.template.send("myTopic", "hi", "foo4");
      this.template.send("myTopic2", "2", "foo5");
latch.await(60TimeUnit.SECONDS);log.info("All received");@KafkaListener(topics "myTopic"public void listen(ConsumerRecord<??> cr) throws Exception { log.info(cr.toString());latch.countDown();}}

可以發現傳送訊息非常簡單,先自動注入一個template,然後template.send(主題,data)或者template.send(主題,key, data)即可

客戶端consumer獲取訊息特別簡單,直接用@KafkaListener註解即可,並在監聽中設定監聽的主題,topics是一個數組可以繫結多個主題的,上面的程式碼中修改為@KafkaListener(topics = {"myTopic","myTopic2"})就可以同時監聽兩個主題的訊息了,但是需要注意的是開始監聽主題前要求主題已經被建立好了。

4.上面只是程式建立producer和consumer,還可以在命令視窗傳送訊息和接受訊息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic  --from-beginning

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic2

在producer命令視窗傳送訊息,在consumer命令視窗和spring-boot程式中都可以接收到訊息。注意的是用命令列建立的producer繫結的主題topic需要用命令列先建立topic,但是在前面的Java程式中已經發送過myTopic和myTopic2兩個主題訊息,主題已經被Java程式建立好了,所以這裡就不需要建立了

demo原始碼下載http://git.oschina.net/liufang1991/kafkademo