1. 程式人生 > >spring-boot 整合kafka單節點訊息傳送與接收

spring-boot 整合kafka單節點訊息傳送與接收

springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。

1、官網下載kafka

2、解壓

3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。

但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊息,故需要修改config/server.properties 檔案

這裡寫圖片描述

如上圖,開啟listeners ,將預設的件監聽IP+埠改為具體的伺服器地址

4、建立生產者,相關配置

public staticMapproducerConfigs() {

Mapprops =newHashMap<>();

props.put
(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.0.90:9092"); // broker 配置 props.put(ProducerConfig.RETRIES_CONFIG,0);//重試次數 props.put(ProducerConfig.BATCH_SIZE_CONFIG,4096);//快取每個topic-partition對應訊息佇列集合 props.put(ProducerConfig.LINGER_MS_CONFIG,1); props.put(ProducerConfig.BUFFER_MEMORY_CONFIG,40960);// Producer非同步佇列記憶體的最大值
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,StringSerializer.class); //序列化 props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); returnprops; }

5、重寫kafkaTemplate類方法,

   @Bean
    public static KafkaTemplate<String, String> kafkaTemplate() {
        return
new KafkaTemplate<String, String>(producerFactory()); } public static ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); }

指向自己的config配置

6、消費者配置

public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.90:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); //反序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);//反序列化
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        return propsMap;
    }
    @Bean
    public Listener listener() {
        return new Listener();
    }

監聽使用@KafkaListener註解

@KafkaListener(topics={"test1"})
    public void listen(ConsumerRecord<?, ?> record) {
        Gson gson = new Gson();
        log.error("監聽器" + gson.toJson(record));
        Optional<?> kafkaMessage = Optional.ofNullable(record.value());
        if (kafkaMessage.isPresent()) {
            Object message = kafkaMessage.get();
            System.out.println("listen1 " + message);
            //對訊息的處理
        }
    }

訊息傳送

 @RequestMapping("/sendMsg")
    @ResponseBody
    public void sendMsg(String msg){
        KafkaTemplate<String, String> kafkaTemplate =  producerConfig.kafkaTemplate();
        kafkaTemplate.send("test1","測試訊息接受傳送");

    }

消費者接收到的資訊:

{"topic":"test1","partition":0,"offset":1,"timestamp":1496738649398,"timestampType":"CREATE_TIME","checksum":460720665,"serializedKeySize":-1,"serializedValueSize":24,"value":"測試訊息接受傳送"}

相關推薦

spring-boot 整合kafka節點訊息傳送接收

springboot還處於學習階段,又同時在學習kafka,兩者結合,繼續學習。 1、官網下載kafka 2、解壓 3、對於單節點來說,按照官網上操作即可實現訊息的傳送和接收。 但是對於客戶端,是通過 @KafkaListener 註解監聽生產者傳送的訊

spring boot整合kafka

1.pom.xml <dependency> <groupId> org.apache.kafka</groupId> <artifactId> kafka_2.10</artifactId&

Spring Boot整合kafka筆記

kafka官網 http://kafka.apache.org/quickstartspring-kafka當前穩定版本是1.2.0..RELEASE  http://docs.spring.io/spring-kafka/docs/1.2.0.RELEASE/reference/html/_introduc

spring boot 整合kafka 採用手動提交時報錯

1、問題 spring boot 整合kafka,採用手動提交時報錯(A manual ackmode is required for an acknowledging listener)。具體報錯如

SpringBoot通過kafka實現訊息傳送接收(包括不能傳送和消費kafka訊息的採坑記錄)

kafka採坑記錄:     1、kafka服務端server.properties中的broker.id叢集內需要唯一。     2、kafka config檔案中listeners和advertised.listeners需要配置本機ip:9092地址,不然消費不到資

Scala:簡單使用Actor的訊息傳送接收求和

從Scala的諸多介紹當中,就看到了不少特別指出Scala中的Actor能夠實現並行程式設計的強大功能,它是基於事件模型的併發機制。或者說,Scala是運用訊息(message)的傳送、接收來實現多執行緒的。使用Scala能夠更容易地實現多執行緒應用的開發。  說到並行與訊息傳送、接收,我記起了上學期“平行

使用Akka持久化——訊息傳送接收

前言在《使用Akka持久化——持久化與快照》一文中介紹瞭如何使用Akka持久化訊息及生成快照。對於叢集應用來說,傳送者發出訊息,只有當收到了接受者的成功回覆才應當認為是一次完整的請求和應答(一些RPC框架只提供了遠端呼叫、序列化/反序列化的機制,但是具體呼叫的成功與否實際是拋

Spring Boot整合websocket實現群聊,點對點聊天,圖片傳送,音訊傳送

參考:基於https://blog.csdn.net/qq_38455201/article/details/80374712  基礎上進行新增圖片傳送和音訊傳送功能   單點圖片傳送: 單點音訊傳送: 音訊傳送相關js參考:https://github.

spring boot整合Shiro實現點登入

前面的部落格中,我們說道了Shiro的兩個最大的特點,認證和授權,而單點登入也是屬於認證的一部分,預設情況下,Shiro已經為我們實現了和Cas的整合,我們加入整合的一些配置就ok了。 1、加入shiro-cas包 <!-- shiro整合cas單點 -->

SpringCloud學習筆記015---Spring Boot整合RabbitMQ傳送接收JSON

在Spring Boot 整合RabbitMQ一文中介紹瞭如何整合RabbitMQ。預設情況下發送的訊息是轉換為位元組碼,這裡介紹一下如何傳送JSON資料。 ObjectMapper 最簡單傳送JSON資料的方式是把物件使用ObjectMapper等JSON工具類把物件轉

Spring boot 整合JavaMail服務傳送郵件

JavaMail是SUN提供給廣大Java開發人員的一款郵件傳送和接受的一款開源類庫,支援常用的郵件協議,如:SMTP、POP3、IMAP,開發人員使用JavaMail編寫郵件程式時,不再需要考慮底層的通訊細節如:Socket而是關注在邏輯層面。JavaMail可以傳送各種

Spring Boot整合es叢集,訊息存放ES

ES叢集引數配置: elasticsearch.clustername = im_es elasticsearch.cluster.nodes = 10.**.**.**,10.**.**.**,10.**.**.** elasticsearch.cluster.port

Spring整合JMS、IBM MQ傳送接收訊息

最近才接觸到MQ,由於之前完全不知道是幹嘛用的,還是很花了一點時間研究的~先來簡單解釋一下名詞啦 一、名詞解釋 MQ MQ(message queue)指訊息佇列,是應用程式對應用程式的通訊方法。可以利用訊息佇列暫存資料報文。 MQ的原理其實就是生產者

kafka學習筆記(三)spring boot整合kafka0.9.0.1(使用配置類)

spring boot 版本:1.5.6引入關於kafka的相關jar         <dependency>          <groupId>org.springframework.kafka</groupI

Spring kafka 學習之二 採用java 配置類方式傳送接收訊息

參考資料:https://docs.spring.io/spring-kafka/reference/html/_introduction.html#compatibilityspring-kafka 版本:2.1.5.release1、配置類package com.hdsx

spring boot 整合activeMQ訊息佇列

  在Spring Boot中整合ActiveMQ相對還是比較簡單的,都不需要安裝什麼服務,預設使用記憶體的activeMQ,當然配合ActiveMQ Server會更好。在這裡我們簡單介紹怎麼使用,本節主要分以下幾個步驟: (1) 新建Maven Java Project

[ Spring Boot ] 整合 Websocket 實現訊息推送框架的設計筆記

前段時間,專案中用Websocket實現了一套後臺向前端推送的Service層搭建,感興趣的童鞋可以瞭解下^_^Maven pom<dependency> <groupId&g

spring整合apache activemq實現訊息傳送的三種方式程式碼配置例項

我們專案中傳送事件告警要用到訊息佇列,所以學習了下activemq,整理如下: activemq的介紹就不用說了,官網上大家可以詳細的看到。 1.下載並安裝activemq:地址http://activemq.apache.org/activemq-590-rel

spring boot 整合activemq 進行服務端訊息推送(web頁面)

最近公司的專案裡有需要服務端向web端實時推送訊息的需求,網上搜索了一番,有前端頁面通過定時任務向後臺傳送ajax請求重新整理,有使用第三方提供的訊息服務(GoEasy),前者因為會有很多請求是無用的,容易加大伺服器負荷造成宕機,後者現在收費了(免費的也只能用一

spring-boot整合dubbo:Spring-boot-dubbo-starter

hub pack 自動配置 china end service get exceptio 整合 為什麽要寫這個小工具 如果你用過Spring-boot來提供dubbo服務,相信使用中有很多“不爽”的地方。既然使用spring boot,那麽能用註解的地方絕不用xml配置,這