1. 程式人生 > >[ Spring Boot ] 整合 Websocket 實現訊息推送框架的設計筆記

[ Spring Boot ] 整合 Websocket 實現訊息推送框架的設計筆記

前段時間,專案中用Websocket實現了一套後臺向前端推送的Service層搭建,感興趣的童鞋可以瞭解下^_^

Maven pom

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

Configuration

package cn.com.showclear.config;

import org.springframework.context.annotation.Configuration
; import org.springframework.messaging.simp.config.MessageBrokerRegistry; import org.springframework.web.socket.config.annotation.AbstractWebSocketMessageBrokerConfigurer; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry
; /** * Websocket 整合Springboot 開發 * <p> * Target: * > 1、預案刪除後臺推送預案執行介面自動重新整理 * * @author YF-XIACHAOYANG * @date 2017/11/23 13:49 */ @Configuration //EnableWebSocketMessageBroker註解表示開啟使用STOMP協議來傳輸基於代理的訊息,Broker就是代理的意思。 @EnableWebSocketMessageBroker public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer
{ /** * 註冊STOMP協議節點,同時指定使用SockJS協議 * @param registry */ @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint("/endpointSang").withSockJS(); } /** * 配置訊息代理,由於我們是實現推送功能,這裡的訊息代理是/msg/... * @param registry */ @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/msg/"); } }

Controller

訊息體

1、RequestMessage
package cn.com.showclear.common.utils;

/**
 * 瀏覽器傳送訊息的接收類
 * 瀏覽器傳送來的訊息用這個類來接收
 * @author YF-XIACHAOYANG
 * @date 2017/11/23 14:30
 */
public class RequestMessage {
    private String name;

    public String getName() {
        return name;
    }
}
2、ResponseMessage
package cn.com.showclear.common.utils;

/**
 * 響應訊息類
 * 伺服器返回給瀏覽器的訊息由這個類來承載
 * @author YF-XIACHAOYANG
 * @date 2017/11/23 14:30
 */
public class ResponseMessage {
    private String responseMessage;

    public ResponseMessage(String responseMessage) {
        this.responseMessage = responseMessage;
    }

    public String getResponseMessage() {
        return responseMessage;
    }
}

請求控制器

package cn.com.showclear.common.controller;

import cn.com.showclear.common.utils.RequestMessage;
import cn.com.showclear.common.utils.ResponseMessage;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.handler.annotation.SendTo;
import org.springframework.stereotype.Controller;

/**
 * Websocket 請求控制器
 * 負責後臺接收推送過來的訊息並完成推送的互動
 * @author YF-XIACHAOYANG
 * @date 2017/11/23 14:26
 */
@Controller
public class WsController {

    //接收主題
    @MessageMapping("/hello")
    //推送主題
    @SendTo("/msg/resp")
    public ResponseMessage say(RequestMessage message) {
        System.out.println(message.getName());
        return new ResponseMessage("hello," + message.getName() + " !");
    }
}

前端指令碼

1、STOMP協議的客戶端指令碼stomp.js、
2、SockJS的客戶端指令碼sock.js
3、jQuery

建立連線和主題推送

/*連線、訂閱主題回撥*/
 function connect() {
        var socket = new SockJS('/endpointSang');
        stompClient = Stomp.over(socket);
        stompClient.connect({}, function (frame) {
            setConnected(true);
            console.log('Connected:' + frame);
            stompClient.subscribe('/msg/resp', function (response) {
                showResponse(JSON.parse(response.body).responseMessage);
            })
        });
    }

/*推送*/
  function sendName() {
        var name = $('#name').val();
        console.log('name:' + name);
        stompClient.send("/hello", {}, JSON.stringify({'name': name}));
    }

推送框架設計

不使用@SendTo註解,通過SimpMessagingTemplate完成訊息推送服務層的搭建,

1@SendTo 適合放在WebsocketListener[@Controller]中監聽指定指定訊息代理並完成任務轉發
2、notifyService 適合在不同控制層[@Controller]中直接完成推送服務

一、後端推送服務搭建

service

/**
 * 訊息推送服務
 */
interface NOTIFY{
    /**
     * 通知訊息
     * @param noticeVO
     */
    void notice(NoticeVO noticeVO);
}

impl

package cn.com.showclear.plan.impl.common;

import cn.com.showclear.plan.pojo.common.NoticeVO;
import cn.com.showclear.plan.service.common.BaseServices;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

/**
 * 訊息推送服務
 * @author YF-XIACHAOYANG
 * @date 2017/11/23 15:36
 */
@Service
public class NotificationServiceImpl implements BaseServices.NOTIFY {

    private static final Logger log = LoggerFactory.getLogger(NotificationServiceImpl.class);

    @Autowired
    private SimpMessagingTemplate messagingTemplate;

    /**
     * 通知訊息傳送
     *
     * @param noticeVO
     */
    @Override
    public void notice(NoticeVO noticeVO) {
        try {
            messagingTemplate.convertAndSend(noticeVO.getSubject(), noticeVO.getData());
        } catch (Exception e) {
            log.error("notice msg error.", e);
        }
    }
}

NoticeVO 訊息體基本物件

package cn.com.showclear.plan.pojo.common;

/**
 * 訊息通知物件
 *
 * @author LuLihong
 * @date 2017-08-30
 **/
public class NoticeVO {
    private final String subject;
    private final Object data;

    public NoticeVO(String subject, Object data) {
        this.subject = subject;
        this.data = data;
    }

    public Object getData() {
        return data;
    }

    public String getSubject() {
        return subject;
    }
}

eg

......

@Autowired
private BaseServices.NOTIFY notifyService;

......

/**
 * 預案結束
 *
 * @return
 */
@RequestMapping(value = "/finishPlan", method = RequestMethod.POST)
public RespMapJson finishPlan(Integer planReId) {
    DataSourceTypeManager.set(DataSources.PLAN);
    RespMapJson resp = psmRunService.finishPlan(planReId);
    if (resp.getCode() == 0) {
        //通知預案在執行介面重新整理
        resp.put("isRefresh", true);
        notifyService.notice(new NoticeVO(NoticeSubject.MSG_REFRESH, resp.getResp()));
    }
    return resp;
}

二、前端推送管理和回撥監聽

amd & require

/**
 * require 通用配置
 * @author Yiyuery
 */
require.config({
    baseUrl: window.main.contextPath,
    paths: {
      
        jquery: "js/lib/jquery/jquery-1.9.1",

        //websocket
        stomp: "js/websocket/stomp",
        sockjs: "js/websocket/sockjs.min",

        //ws-utils
        'scooper-notice': 'js/scooper/scooper.notice',
        'msg-ws':'js/websocket/msg-websocket',
        
        //提示元件
        layer: 'js/lib/layer/layer',
        
        //自定義元件
        capsule: 'js/lib/capsule/capsule.util',
    },
    /*定義模組依賴*/
    shim: {
        layer: { deps: ['jquery'] },
        capsule: { deps: ['jquery', 'layer', 'pager'] }
    }
});

推送管理

/**
 * Created by LuLihong on 2017/8/30.
 */
window.scooper = window.scooper || {};
/**
 * 訊息通知
 * @type {{}}
 */
window.scooper.notice = {
    /**
     * 主題
     */
    subjects: {
        /**預案執行數目變更通知主題*/
        refresh: '/msg/refresh',
        /**測試監聽主題**/
        test:'/msg/test',
        /**測試通道佔用**/
        topic:'/topic/resp'
    },

    /**
     * 主題監聽器
     */
    listeners: {},

    /**
     * 獲取需要監聽的主題
     * @returns {[*,*,*]}
     */
    getSubjects: function() {
        return [this.subjects.refresh,this.subjects.test,this.subjects.topic];
    },

    /**
     * 新增監聽器
     * @param subject
     * @param listener
     */
    addListener: function(subject, listener) {
        this.listeners[subject] = listener;
    },

    /**
     * 刪除監聽器
     * @param subject
     */
    removeListener: function(subject) {
        delete this.listeners[subject];
    },

    /**
     * 獲取監聽器
     * @param subject
     * @returns {*}
     */
    getListener: function(subject) {
        return this.listeners[subject];
    },

    /**
     * 接收到通知
     * @param subject
     * @param notice
     */
    recvNotice: function(subject, notice) {
        window.console.log('recv notice: ' + subject + '=' + notice);
        var listener = this.listeners[subject];
        if (listener) {
            var noticeObj = JSON.parse(notice);
            listener(noticeObj);

        }
    }
};

推送核心模組

/**
 * 獲取後臺的通知訊息,以websocket方式獲取。
 * Created by LuLihong on 2017/8/30.
 */
define(['jquery', 'scooper-notice', 'stomp', 'sockjs'], function ($) {

    var stompClient = null;

    /**
     * 定義模組
     * @type {{}}
     */
    var MSGWS = {
        /**
         * 建立SocKJS例項並獲取websocket連線
         */
        connect: function (fn) {
            //申明連線的SockJSendpoint名稱:與後臺WebsocketConfig保持一致
            var socket = new SockJS(window.main.contextPath + '/endpointSang', null, {rtt: 5000});
            //使用STOMP來建立WebSocket客戶端
            stompClient = Stomp.over(socket);
            stompClient.connect({}, function (frame) {
                var subjects = window.scooper.notice.getSubjects();
                $.each(subjects, function (i, subject) {
                    /**
                     * 訂閱/msg/resp等主題傳送來的訊息,分發事件
                     * Controller中的say方法上新增的@SendTo註解的引數
                     * stompClient中的send方法表示傳送一條訊息到服務端
                     */
                    stompClient.subscribe(subject, function (resp) {
                        window.scooper.notice.recvNotice(subject, resp.body);
                    });
                });
            });
            if (fn instanceof Function) {
                fn();
            }
        },
        /**
         * 斷開連線
         */
        disconnect: function () {
            if (stompClient != null) {
                stompClient.disconnect();
            }

            window.console.log('Disconnected');
        },
        /**
         * 檢查連線狀態
         */
        checkState: function () {
            if (stompClient == null || !stompClient.connected) {
                MSGWS.connect();
            }
        },
        /**
         * 連線保持定時監聽
         */
        keepListenerTimer: function () {
            setInterval(MSGWS.checkState, 5000);
        },
        /**
         * 初始化
         */
        init: function () {
            MSGWS.connect(function () {
                MSGWS.keepListenerTimer();
            });

        }
    };

    /*立即執行函式,完成連線*/
    MSGWS.init();

    /**
     * 對外開放部分介面
     */
    return {
        /**
         * 關閉連線
         */
        disconnect: MSGWS.disconnect
    }
});

註冊回撥監聽

/*頭部引入模組*/
define(["require", "exports", "jquery", "avalon", "capsule", "layer", 'msg-ws'], function (require, exports, $, avalon, capsule, layer, msgWs)


/**
 * 註冊websocket回撥主題
 */
function regWebSocketListener() {
    window.scooper.notice.addListener('/msg/refresh', wsNotify);
},
    
/**
 * websocket訊息通知
 * @param msg
 */
function wsNotify(isRefresh) {
    if (isRefresh) {
        layer.msg('執行中預案數目發生變更,正在重新載入...');
        capsule.baseUtil.delay(2, function () {
            window.location.reload();
        });
    }
}