Spring Boot整合kafka筆記
kafka官網 http://kafka.apache.org/quickstart
spring-kafka當前穩定版本是1.2.0..RELEASE http://docs.spring.io/spring-kafka/docs/1.2.0.RELEASE/reference/html/_introduction.html
1.首先下載kafka,解壓縮,然後執行zookeeper和kafka
cd kafka_2.11-0.10.2.0
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
經過測試發現Java程式傳送訊息時自動建立了主題,所以不需要用命令單獨建立主題了(如果用命令列producer來繫結主題topic必須要先用命令列建立一個topic)
2.按照spring-kafka官網整合文件
1)pom中新增依賴
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>0.10.2.0</version> </dependency> <dependency><groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>1.2.0.RELEASE</version> </dependency>
2)配置,spring boot1.5.1可以直接在application.yml 配置檔案中配置,官方文件說只要配置兩個必要項就可以了
spring: kafka: consumer: group-id: foo auto-offset-reset:group-id是所屬分組(kafka要求一個分組的成員數量不能大於設定的分割槽數,否則同一個分組中多出的成員永遠都不會收到訊息),auto-offset-reset設為earliest是獲取以前就有過的訊息(其實不是必要的配置)earliest
可以在配置檔案中進行更完整的配置,但是配置檔案中支援的預設配置項還是有限,如果需要更加完整的配置建議還是用bean配置
spring: kafka: producer: retries: 0 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer bootstrap-servers: localhost:9092 consumer: bootstrap-servers: localhost:9092 group-id: foo auto-offset-reset: earliest enable-auto-commit: true auto-commit-interval: 100 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.Java部分就是spring-kafka官網使用的程式碼貼上過來了,可以直接在application中同時執行producer和consumer
import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.builder.SpringApplicationBuilder; import org.springframework.boot.web.support.SpringBootServletInitializer; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @SpringBootApplication @Slf4j public class MyApplication extends SpringBootServletInitializer implements CommandLineRunner { @Override protected SpringApplicationBuilder configure(SpringApplicationBuilder application) { return application.sources(MyApplication.class); } public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @Autowired private KafkaTemplate<String, String> template; private final CountDownLatch latch = new CountDownLatch(4); @Override public void run(String... args) throws Exception { this.template.send("myTopic", "foo1"); this.template.send("myTopic", "foo2"); this.template.send("myTopic", "foo3"); this.template.send("myTopic", "hi", "foo4");
this.template.send("myTopic2", "2", "foo5");latch.await(60, TimeUnit.SECONDS);log.info("All received");} @KafkaListener(topics = "myTopic") public void listen(ConsumerRecord<?, ?> cr) throws Exception { log.info(cr.toString());latch.countDown();}}
可以發現傳送訊息非常簡單,先自動注入一個template,然後template.send(主題,data)或者template.send(主題,key, data)即可
客戶端consumer獲取訊息特別簡單,直接用@KafkaListener註解即可,並在監聽中設定監聽的主題,topics是一個數組可以繫結多個主題的,上面的程式碼中修改為@KafkaListener(topics = {"myTopic","myTopic2"})就可以同時監聽兩個主題的訊息了,但是需要注意的是開始監聽主題前要求主題已經被建立好了。
4.上面只是程式建立producer和consumer,還可以在命令視窗傳送訊息和接受訊息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic myTopic --from-beginning
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic myTopic2
在producer命令視窗傳送訊息,在consumer命令視窗和spring-boot程式中都可以接收到訊息。注意的是用命令列建立的producer繫結的主題topic需要用命令列先建立topic,但是在前面的Java程式中已經發送過myTopic和myTopic2兩個主題訊息,主題已經被Java程式建立好了,所以這裡就不需要建立了
demo原始碼下載http://git.oschina.net/liufang1991/kafkademo