1. 程式人生 > >springboot+rabbitmq+websocket廣播模式進行訊息實時推送

springboot+rabbitmq+websocket廣播模式進行訊息實時推送

如何安裝rabbitmq在此就不再贅述了,直接上程式碼,使用的direct佇列模式。

依賴

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
        </dependency>

生產者

application.properties

#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#開啟訊息傳送確認
spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-returns=true
#返回無法插入佇列的訊息
spring.rabbitmq.template.mandatory=true

準備訊息佇列與路由key

public class RabbitConstant {
    //交換機名稱
    public final static String EXCHANGE = "exchange_test";
    //佇列
    public final static String QUEUE_TRANSACTION = "queue_transaction";
    public final static String QUEUE_CONTRACT = "queue_contract";
    public final static String QUEUE_QUALIFICATION = "queue_qualification";
    //路由key
    public final static String RK_TRANSACTION = "transaction";
    public final static String RK_CONTRACT = "contract";
    public final static String RK_QUALIFICATION = "qualification";
}

配置

@Configuration
public class RabbitMqConfig {

    /**
     * 宣告佇列
     *
     * @return
     */
    @Bean
    public Queue queueTransaction() {
        // true表示持久化該佇列
        return new Queue(RabbitConstant.QUEUE_TRANSACTION, true);
    }

    @Bean
    public Queue queueContract() {
        // true表示持久化該佇列
        return new Queue(RabbitConstant.QUEUE_CONTRACT, true);
    }

    @Bean
    public Queue queueQualification() {
        // true表示持久化該佇列
        return new Queue(RabbitConstant.QUEUE_QUALIFICATION, true);
    }

    /**
     * 宣告互動器
     *
     * @return
     */
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(RabbitConstant.EXCHANGE);
    }

    /**
     * 繫結
     *
     * @return
     */
    @Bean
    public Binding bindingTransaction() {
        return BindingBuilder.bind(queueTransaction()).to(directExchange()).with(RabbitConstant.RK_TRANSACTION);
    }

    /**
     * 繫結
     *
     * @return
     */
    @Bean
    public Binding bindingContract() {
        return BindingBuilder.bind(queueContract()).to(directExchange()).with(RabbitConstant.RK_CONTRACT);
    }

    /**
     * 繫結
     *
     * @return
     */
    @Bean
    public Binding bindingQualification() {
        return BindingBuilder.bind(queueQualification()).to(directExchange()).with(RabbitConstant.RK_QUALIFICATION);
    }

}

釋出者實現介面

@Component
@Slf4j
public class Sender implements RabbitTemplate.ConfirmCallback, ReturnCallback {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnCallback(this);
    }

    //訊息傳送確認回撥方法
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info("訊息傳送成功:" + correlationData);
    }

    //訊息傳送失敗回撥方法
    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.error("訊息傳送失敗:" + new String(message.getBody()));
    }

    /**
     * 傳送訊息,不需要實現任何介面,供外部呼叫
     *
     * @param messageVo
     */
    public void send(MessageVo messageVo) {
        CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
        rabbitTemplate.convertAndSend(RabbitConstant.EXCHANGE, messageVo.getRoutingKey(), JSONObject.toJSON(messageVo).toString(), correlationId);
    }
}

Ok,只要呼叫send方法就可一把訊息傳送到指定的佇列中了,傳送後會呼叫confirm或者returnedMessage方法來返回結果,

下面開始講消費者。

消費者

application.properties

#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

消費者配置

import com.rabbitmq.client.Channel;
import com.yxd.rabbitmq.web.websocket.WebSocketServerEndpoint;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListenerConfigurer;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.rabbit.listener.RabbitListenerEndpointRegistrar;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.converter.MappingJackson2MessageConverter;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;

import java.io.IOException;


@Configuration
@EnableRabbit
@Slf4j
public class ConsumerConfig implements RabbitListenerConfigurer {

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private WebSocketServerEndpoint webSocketServerEndpoint;

    @Bean
    public DefaultMessageHandlerMethodFactory handlerMethodFactory() {
        DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
        factory.setMessageConverter(new MappingJackson2MessageConverter());
        return factory;
    }

    @Bean
    public SimpleMessageListenerContainer messageContainer(@Qualifier("queueTransaction") Queue transaction, @Qualifier("queueContract") Queue contract, @Qualifier("queueQualification") Queue qualification) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(transaction, contract, qualification);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL); //設定確認模式手工確認
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                log.info("receive msg : " + new String(body));
                try {
                    webSocketServerEndpoint.sendMessageToAll(new String(body));
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);//確認訊息成功消費
                } catch (IOException e) {
                    log.error("訊息推送前臺出錯:" + e.getMessage() + "/r/n重新發送");
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); //重新發送
                }
            }
        });
        return container;
    }

    @Override
    public void configureRabbitListeners(RabbitListenerEndpointRegistrar rabbitListenerEndpointRegistrar) {
        rabbitListenerEndpointRegistrar.setMessageHandlerMethodFactory(handlerMethodFactory());
    }
}

配置完後會自動監聽訊息佇列,AcknowledgeMode設定為手動模式可以自己來管理是否確認消費成功或者重新發送,當然我這裡重新發送會不斷的傳送,到時候根據具體業務進行處理就行了。

websocket

配置

@Configuration
public class WebSocketConfig {

    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

socket服務

package com.yxd.rabbitmq.web.websocket;

import com.alibaba.fastjson.JSONObject;
import com.yxd.rabbitmq.model.MessageVo;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * ServerEndpoint
 * <p>
 * 使用springboot的唯一區別是要@Component宣告下,而使用獨立容器是由容器自己管理websocket的,但在springboot中連容器都是spring管理的。
 * <p>
 * 雖然@Component預設是單例模式的,但springboot還是會為每個websocket連線初始化一個bean,所以可以用一個靜態set儲存起來。
 */
@ServerEndpoint("/ws/yxd/{userId}") //WebSocket客戶端建立連線的地址
@Component
@Slf4j
public class WebSocketServerEndpoint {

    /**
     * 存活的session集合(使用執行緒安全的map儲存)
     */
    private static Map<String, Session> livingSessions = new ConcurrentHashMap<>();

    /**
     * 建立連線的回撥方法
     *
     * @param session 與客戶端的WebSocket連線會話
     * @param userId  使用者名稱,WebSocket支援路徑引數
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("userId") String userId) {
        livingSessions.put(session.getId(), session);
        log.info(userId + "進入連線");
    }

    @OnMessage
    public void onMessage(String message, Session session, @PathParam("userId") String userId) {
        log.info(userId + " : " + message);
        sendMessageToAll(userId + " : " + message);
    }


    @OnError
    public void onError(Session session, Throwable error) {
        log.info("發生錯誤");
        log.error(error.getStackTrace() + "");
    }


    @OnClose
    public void onClose(Session session, @PathParam("userId") String userId) {
        livingSessions.remove(session.getId());
        log.info(userId + " 關閉連線");
    }

    /**
     * 單獨傳送訊息
     *
     * @param session
     * @param message
     */
    public void sendMessage(Session session, String message) {
        try {
            session.getBasicRemote().sendText(message);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 群發訊息
     *
     * @param message
     */
    public void sendMessageToAll(String message) {
        MessageVo messageVo = JSONObject.parseObject(message, MessageVo.class);
        livingSessions.forEach((sessionId, session) -> {
            //發給指定的接收使用者
//            if (userId.equals(messageVo.getReceiveUserId())) {
            sendMessage(session, message);
//            }
        });
    }

}

前臺

我這裡使用的是vue

queueReceiveSetting: {//訊息佇列配置
          websock: null,
          client: null,
          wsuri: "ws://localhost:8111/rabbitmq/ws/huhy/lxy",
        },

initWebSocket() {
        //ws地址
        if (this.queueReceiveSetting.websock) {
          this.queueReceiveSetting.websock.close()
        }
        this.queueReceiveSetting.websock = new WebSocket(this.queueReceiveSetting.wsuri);
        this.queueReceiveSetting.websock.onopen = function (res) {
          console.log("開啟連線")
        };
        this.queueReceiveSetting.websock.onmessage = function (res) {
          let data = JSON.parse(res.data)
          console.log("收到資料:" + data.content)
          Message({
            message: data.content,
            type: 'info',
            showClose: true,
            duration: 3 * 1000
          })
        };
        this.queueReceiveSetting.websock.onclose = function (res) {
          console.log("連線關閉")
        };
        this.queueReceiveSetting.websock.onerror = function (res) {
          console.log("連接出錯")
          // this.initWebSocket();
        };
      }
這樣就能建立連線,進行訊息推送了。因為是自己剛寫的,還有很多不足,希望大家也能提出來^_^

相關推薦

springboot+rabbitmq+websocket廣播模式進行訊息實時

如何安裝rabbitmq在此就不再贅述了,直接上程式碼,使用的direct佇列模式。依賴 <dependency> <groupId>org.springframework.boot</groupId>

JavaScript仿通知欄新訊息實時更新效果

用到websocket做訊息推送,其中有這樣的一個效果,在訊息列表的模組,接收到很多條資訊,展示在介面的是最近的十條接受到的訊息,實時更新模組,每次接受一條訊息,都會展示在最頂部,舊的訊息排列在地下,大致效果如下所示。 用js仿寫了一個簡單的,關於通知欄新訊息實時推送更新效果,程式碼如下:

轉載自-亭鈺wang,php 訊息實時(反ajax

入口檔案index.html <!DOCTYPE HTML> <html> <head>     <title>反ajax推送</title>     <style>  

WebSocket 教程,關於伺服器實時

原文出處: http://www.ruanyifeng.com/blog/2017/05/websocket.html WebSocket 是一種網路通訊協議,很多高階功能都需要它。 本文介紹 WebSocket 協議的使用方法。 一、為什麼需要 WebSocket?

Asp.net SignalR 實現服務端訊息實時到所有Web端

ASP .NET SignalR是一個ASP .NET 下的類庫,可以在ASP .NET 的Web專案中實現實時通訊。實際上 Asp.net SignalR 2 實現 服務端訊息推送到Web端, 更加簡單

WebSocket和kafka實現資料實時到前端

一. 需求背景 最近新接觸一個需求,需要將kafka中的資料實時推送到前端展示。最開始想到的是前端輪詢介面資料,但是無法保證輪詢的頻率和消費的頻率完全一致,或造成資料缺失等問題。最終確定用利用WebSocket實現資料的實時推送。 二. websocket簡介 網上已經有好

Java Spring WebSocket實現後端訊息主動

這篇文章將介紹如何構建一個簡單的WebSocket訊息推送Demo使用eclipse建立maven專案後引入相關的依賴jar包,如下:<?xml version="1.0" encoding="UTF-8"?><project xmlns="http://m

利用 socket.io 實現訊息實時

專案背景介紹 最近在寫的專案中存在著社交模組,需要實現這樣的一個功能:當發生了使用者被點贊、評論、關注等操作時,需要由伺服器向用戶實時地推送一條訊息。最終完成的專案地址為:https://github.com/noiron/socket-message-push,這裡將介紹一下實現的思路及部分程式碼。

C/C++ 基於 websocket 的前臺及後臺實時

實現步驟如下: 1. 獲取 GoEasy appkey. 在 goeasy 官網上註冊一個賬號,並新建一個 app. APP 建立好後系統會為該 app 自動生成兩個 key, 一個既可以用來接收又可以用來推送 (supper key),另一個只可以用來接收(subscr

SpringBoot整合WebSocket【基於STOMP協議】進行點對點[一對一]和廣播[一對多]實時,內附簡易聊天室demo

最近專案來了新需求,需要做一個實時推送的功能,伺服器主動推送訊息給客戶端,在網上經過一輪搜查之後,確定使用WebSocket來進行開發。以前經常聽說WebSocket的神奇之處,如今終於可以嘗試使用它了。1.淺談WebSocketWebSocket是在HTML5基礎上單個TC

SpringBoot整合WebSocket【基於純H5】進行點對點[一對一]和廣播[一對多]實時

之前實現WebSocket基於STOMP的,覺得SpringBoot封裝的太高,不怎麼靈活,現在實現一個純H5的,也大概瞭解webSocket在內部是怎麼傳輸的。1.環境搭建因為在上一篇基於STOMP協議實現的WebSocket裡已經有大概介紹過Web的基本情況了,所以在這篇

RabbitMQ訂閱釋出的訊息,通過WebSocket實現資料實時到前端上顯

一、架構簡單概述  RabbitMQ訊息佇列服務善於解決多系統、異構系統間的資料交換(訊息通知/通訊)問題,並且可以訂閱和釋出,而隨著HTML5誕生的WebSocket協議實現了瀏覽器與伺服器的全雙工通訊,擴充套件了瀏覽器與服務端的通訊功能,使服務端也能主動向客戶端傳送資料

Python-RabbitMQ fanout廣播模式

空閑 ron count 現實 打開 公平分發 簡單 實現 mage 現實生活中,機器的配置不同,有的性能高,有的性能低,有可能A機器處理了四條消息,B機器一條消息還沒處理完,要是按照輪詢方式來分發消息的話,那就是A機器一直是空閑,B機器有一大丟消息處理不完。我們在部署集群

Python-RabbitMQ direct廣播模式

shadow sha vpd 一個 ima info mar 技術 ror fanout廣播模式是全部都能收到信息,那我要是想要有條件選擇的接收呢,需要用到direct模式 這張圖的大概意思是Exchange的類型為direct,發的error級別的消息投遞到第一個隊列,消

WebSocket實時訊息

廢話不多說,直接上程式碼,本人親自測試,可以使用。 前臺js程式碼: var websocket; var path = window.location.host+"/web-socket"; // 首先判斷是否 支援 WebSocket if('WebSocket' in windo

springboot整合rabbitMQ實現訊息

RabbitMQ訊息中介軟體元件,目前比較流行的訊息中介軟體有:RabbitMQ、RocketMQ、ActiveMQ、Kafka等。 我的程式碼中用的是RabbitMQ,先介紹幾個概念:       一:訊息佇列的特性如下: 非同步性,將耗時的同步操作通過以傳送訊息的方

web的實時技術--websocket

Python的tornado框架中,websocket允許瀏覽器和伺服器之間進行雙向,實時,持久的,全雙工通訊。協議本身使用 ws://URL的格式,如果是加密的websocket則使用的是wss://URL websocket和其他方式的比較: 1、http協

WebSocket實現實時資料到前端

@Component @EnableWebSocket public class WebSocketConfig implements WebSocketConfigurer{ @Resource goodsWebSocketHandler handler;

Spring+Websocket實現訊息

本文主要有三個步驟 1、使用者登入後建立websocket連線,預設選擇websocket連線,如果瀏覽器不支援,則使用sockjs進行模擬連線 2、建立連線後,服務端返回該使用者的未讀訊息 3、服務端進行相關操作後,推送給某一個使用者或者所有使用者新訊息 相關

spring boot 整合websocket實現實時

websocket暫時能想到的應用是瀏覽器聊天和後臺日誌實時顯示到前臺。 當後臺啟動程式後,等待前臺連線,連線上之後,前後臺就可以相互發送資料了。 先貼上pom配置: <dependency> <groupId&g