1. 程式人生 > >SpringBoot整合Netty之Websocket

SpringBoot整合Netty之Websocket

前後端通過websocket通訊進行聊天~ 核心程式碼整理如下:


netty元件

@Component
public class NettyBooter implements ApplicationListener<ContextRefreshedEvent> {
    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        if(contextRefreshedEvent.getApplicationContext().getParent() == null){
            try {
                //開啟WebSocket服務
                WSServer.getInstance().start();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
}

WSServer.java

/**
 * 考慮反射:
 *   由於在呼叫 SingletonHolder.instance 的時候,才會對單例進行初始化,而且通過反射,是不能從外部類獲取內部類的屬性的。
 *   所以這種形式,很好的避免了反射入侵。
 * 考慮多執行緒:
 *   由於靜態內部類的特性,只有在其被第一次引用的時候才會被載入,所以可以保證其執行緒安全性。
 * 不需要傳參的情況下 優先考慮靜態內部類
 */
@Component
public class WSServer {

    private EventLoopGroup bossGroup;
    private EventLoopGroup workerGroup;
    private  ServerBootstrap server;
    private ChannelFuture future;

    private static class SingletionWSServer{
        static final WSServer instance = new WSServer();
    }

    public static WSServer getInstance(){
        return SingletionWSServer.instance;
    }

    public WSServer() {
        bossGroup = new NioEventLoopGroup();
       workerGroup =new NioEventLoopGroup();
        server = new ServerBootstrap();

       server.group(bossGroup, workerGroup)
                .channel(NioServerSocketChannel.class)
                .childHandler(new WSServerInitialzer());//自定義初始化handler容器
    }

    public void start(){
        //自定義埠
        this.future = server.bind(8088);
    }

}

初始化handler容器類

public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel socketChannel) throws Exception {

        ChannelPipeline pipeline = socketChannel.pipeline();

        //websocket 基於http協議 所以要有http編解碼器
        pipeline.addLast(new HttpServerCodec());

        //對寫大資料流的支援
        pipeline.addLast(new ChunkedWriteHandler());

        //對httpMessage進行聚合,聚合成FullHttpRequest或FullHttpResponse
        //幾乎在netty中的程式設計 ,都會使用到此handler
        pipeline.addLast(new HttpObjectAggregator(1024*64));

        //====================以上是使用支援http協議====

        //===================增加心跳===================
        //如果是讀寫空閒  不處理
        pipeline.addLast(new IdleStateHandler(8,10,12));
       //自定義空閒狀態檢測
        pipeline.addLast(new HeartBeatHandler());


        /*
        * websocket 伺服器處理的協議 ,用於指定給客戶端連線訪問的路由 :/ws
        * 本handler 會幫你處理一些繁重的複雜的事
        * 會幫你處理握手動作 :handshaking (close,ping,pong)ping+pong=心跳
        * 對於websocket來講, 都是以frams進行傳輸的不同的資料型別對應的frames也不同
        * */
        pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));

        //自定義handler
        pipeline.addLast(new ChatHandler());

    }
}

自定義的聊天handelr,其中channelRead0中很多東西沒有提供,自己看註釋理解吧 ,反正收到前端傳來的訊息,隨你怎麼處理,我只是提供一種思路而已

public class ChatHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> {

    static public ChannelGroup clients =
            new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception {

        Channel currentChannel = ctx.channel();
        //獲取客戶端傳輸過來的訊息
        String content = msg.text();

        System.out.println("接收的資料:" + content);
        //1.獲取客戶端傳送來的訊息
        DataContent dataContent = JsonUtils.jsonToPojo(content, DataContent.class);
        Integer action = dataContent.getAction();
        //2判斷訊息的型別,更具不同的型別來處理不同的業務
        if (action == MsgActionEnum.CONNECT.getType()) {
            //2.1當websocket 第一次open的時候 初始化channel 並把userid和channel進行繫結
            String senderId = dataContent.getMixinMsg().getSenderId();
            UserChannelRel.put(senderId, currentChannel);
        } else if (action == MsgActionEnum.CHAT.getType()) {
            //2.2聊天型別的訊息
            MixinMsg mixinMsg = dataContent.getMixinMsg();
            String msgText = mixinMsg.getMsg();
            String recevierId = mixinMsg.getReceiverId();
            String senderId = mixinMsg.getSenderId();

            //儲存訊息到資料庫,並且標記為未簽收
            IChatMsgService chatMsgService = (IChatMsgService) SpringUtil.getBean("chatMsgServiceImpl");

            String msgId = chatMsgService.saveMsg(mixinMsg);
            mixinMsg.setMsgId(msgId);

            //構造傳送的訊息
            DataContent dataContentMsg = new DataContent();
            dataContentMsg.setMixinMsg(mixinMsg);
            //傳送訊息
            Channel recvchannel = UserChannelRel.get(recevierId);
            //從全域性使用者channel關係中獲取接收方的channel
            if (recvchannel == null) {
                //TODD channel為空代表使用者離線 推送訊息
            } else {
                //當channel 不為空的時候 從ChannelGroup去查詢channnel是否存在\
                Channel findChannel = clients.find(recvchannel.id());
                if (findChannel == null) {
                    //TODD channel為空代表使用者離線 推送訊息
                } else {
                    //使用者線上
                    recvchannel.writeAndFlush(new TextWebSocketFrame(
                            JsonUtils.objectToJson(dataContentMsg)
                    ));
                }
            }
        } else if (action == MsgActionEnum.SIGNED.getType()) {
             //批量簽收訊息
                ...

        } else if (action == MsgActionEnum.KEEPALIVE.getType()) {
            //2.2心跳型別的訊息
            System.out.println("收到【" + ctx.channel() + "】的心跳包!");
        }


        /*
        //群發
        TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString() + "--"
                + ctx.channel().id() + "===》" + content);


        for (Channel channel : clients) {
            channel.writeAndFlush(tws);
        }*/

//      下面這個方法 和上面的for迴圈 一致
//       clients.writeAndFlush(tws);

    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        clients.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
        //ChannelGroup會自動移除
        clients.remove(ctx.channel());
    }


    //異常處理
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.channel().close();
        clients.remove(ctx.channel());
    }
}

心跳類

//處理心跳
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
    
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        if (evt instanceof IdleStateEvent) {
            IdleState state = ((IdleStateEvent) evt).state();
            if (state == IdleState.READER_IDLE) {
                System.out.println("進入讀空閒。。。");
            }else if(state == IdleState.WRITER_IDLE){
                System.out.println("進入寫空閒。。。");
            }else if(state == IdleState.ALL_IDLE){
                //關閉無用的channel 以防資源浪費
                Channel channel = ctx.channel();
                channel.close();
            }
        } else {
            super.userEventTriggered(ctx, evt);
        }
    }
}

前端 js 實現websocket客戶端 :https://blog.csdn.net/wangzhanzheng/article/details/78603532

改造實現如下,僅供參考

// 構建聊天業務CHAT  WEBSocket
			window.CHAT = {
				socket: null,
				init: function() {
					if (window.WebSocket) {

						// 如果當前的狀態已經連線,那就不需要重複初始化websocket
						if (CHAT.socket != null &&
							CHAT.socket != undefined &&
							CHAT.socket.readyState == WebSocket.OPEN) {
							return false;
						}

						CHAT.socket = new WebSocket(app.nettyServerUrl);
						CHAT.socket.onopen = CHAT.wsopen,
							CHAT.socket.onclose = CHAT.wsclose,
							CHAT.socket.onerror = CHAT.wserror,
							CHAT.socket.onmessage = CHAT.wsmessage;
					} else {
						alert("不支援ws通訊...");
					}
				},
				chat: function(msg) {

					// 如果當前websocket的狀態是已開啟,則直接傳送, 否則重連
					if (CHAT.socket != null &&
						CHAT.socket != undefined &&
						CHAT.socket.readyState == WebSocket.OPEN) {
						CHAT.socket.send(msg);
					} else {
						// 重連websocket
						CHAT.init();
						setTimeout("CHAT.reChat('" + msg + "')", "1000");
					}
					// 渲染快照列表進行展示
				
				},
				reChat: function(msg) {
					console.log("訊息重新發送...");
					CHAT.socket.send(msg);
				},
				wsopen: function() {
					console.log("websocket連線已建立...");

				
					// 構建Msg
				
					// 構建DataContent
				
					// 傳送websocket
					CHAT.chat(JSON.stringify(dataContent));

					// 每次連線之後,獲取使用者的未讀未簽收訊息列表
				

					// 定時傳送心跳
					setInterval("CHAT.keepalive()", 10000);
				},
				wsmessage: function(e) {
					console.log("接受到訊息:" + e.data);
	
				},
				wsclose: function() {
					console.log("連線關閉...");
				},
				wserror: function() {
					console.log("發生錯誤...");
				},
				signMsgList: function(unSignedMsgIds) {
					// 批量簽收 
                                    ...	
				},
				keepalive: function() {				
					// 傳送心跳
				        ...
			            
				}
			};