1. 程式人生 > >Netty實現WebSocket

Netty實現WebSocket

request inb date keep turn elf HA close 地址

package com.qmtt.server;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; @Service public class NettyServer { private static final Logger log = LoggerFactory.getLogger(NettyServer.class); EventLoopGroup bossGroup; EventLoopGroup workGroup; Channel channel;
// public static void main(String[] args) { // new NettyServer().run(); // } @PostConstruct public void run() { log.info("啟動netty"); bossGroup = new NioEventLoopGroup(); workGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workGroup); b.channel(NioServerSocketChannel.
class); b.childHandler(new ChildChannelHandler()); channel = b.bind(7397).sync().channel(); // channel.closeFuture().sync(); } catch (Exception e) { log.error("", e); } finally { // bossGroup.shutdownGracefully(); // workGroup.shutdownGracefully(); } } @PreDestroy public void stop() { log.info("關閉netty"); if (null == channel) { log.error("server channel is null"); } bossGroup.shutdownGracefully(); workGroup.shutdownGracefully(); channel.closeFuture().syncUninterruptibly(); bossGroup = null; workGroup = null; channel = null; } }
package com.qmtt.server;

import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;

import com.qmtt.tools.JsonUtils;
import com.qmtt.tools.SpringUtil;
import com.qmtt.websocket.GameFunction2;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultFullHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
import io.netty.util.CharsetUtil;

public class MyWebSocket2 extends SimpleChannelInboundHandler<Object> {
    private static final Logger log = LoggerFactory.getLogger(MyWebSocket2.class);
    private WebSocketServerHandshaker handshaker;

    private static Map<String, ChannelHandlerContext> webSocketMap = new Hashtable<String, ChannelHandlerContext>();

    public GameFunction2 gameFunction;

    RedisTemplate redisTemplate;

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        log.info("客戶端與服務端連接開啟");
        gameFunction = SpringUtil.getBean(GameFunction2.class);
        redisTemplate = (RedisTemplate) SpringUtil.getBean("redisTemplate");
        // 添加
        // Global.group.add(ctx.channel());
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        // 移除
        // Global.group.remove(ctx.channel());
        log.info("客戶端與服務端連接關閉");
        String key = null;
        Iterator iterator = webSocketMap.entrySet().iterator();
        while (iterator.hasNext()) {
            Map.Entry<String, ChannelHandlerContext> entry = (Entry<String, ChannelHandlerContext>) iterator.next();
            key = entry.getKey();
            if (entry.getValue().equals(ctx)) {
                key = entry.getKey();
                break;
            }
        }
        log.info("<{}>斷開連接", key);
        if (key != null) {
            webSocketMap.remove(key);
        }
        gameFunction.close(key);
    }

    @Override
    protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
        if (msg instanceof FullHttpRequest) {
            handleHttpRequest(ctx, ((FullHttpRequest) msg));
        } else if (msg instanceof WebSocketFrame) {
            handlerWebSocketFrame(ctx, (WebSocketFrame) msg);
        }
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }

    private void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
        // 判斷是否關閉鏈路的指令
        if (frame instanceof CloseWebSocketFrame) {
            log.info("連接開閉");
            handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain());
            return;
        }
        // 判斷是否ping消息
        if (frame instanceof PingWebSocketFrame) {
            ctx.channel().write(new PongWebSocketFrame(frame.content().retain()));
            return;
        }
        // 本例程僅支持文本消息,不支持二進制消息
        if (!(frame instanceof TextWebSocketFrame)) {
            log.info("不支持二進制消息");
            return;
        }
        // 返回應答消息
        String message = ((TextWebSocketFrame) frame).text();
        if (!message.contains("msgType")) {
            return;
        }
        log.info("服務端收到消息:" + message);
        try {
            Map map = JsonUtils.json2map(message);
            String msgTpye = map.get("msgType").toString();
            String openid = map.get("openid").toString();
            // 開始遊戲
            if (msgTpye.equals("start")) {
                webSocketMap.put(openid, ctx);
                // String rankValue = map.get("rankValue").toString();
                gameFunction.joinGame(openid);
                return;
            }
            // 回答問題
            if (msgTpye.equals("answer")) {
                gameFunction.answer(map);
                return;
            }
            // 遊戲結束
            if (msgTpye.equals("gameover")) {
                gameFunction.gameover(map);
                return;
            }
            // 發出邀請等待對手
            if (msgTpye.equals("wait")) {
                webSocketMap.put(openid, ctx);
                gameFunction.waitEnter(openid);
                return;
            }
            // 發出邀請對手進入
            if (msgTpye.equals("waitEnter")) {
                webSocketMap.put(openid, ctx);
                String inviteOpenid = (String) map.get("inviteOpenid");
                // 要判斷用戶是否已經開始在玩遊戲 了,是否已經離開
                gameFunction.checkUserStatus(openid, inviteOpenid);
                return;
            }
            // 發出邀請對手進入
            if (msgTpye.equals("waitStart")) {
                gameFunction.waitStart(openid);
                return;
            }
            // 再來一局
            if (msgTpye.equals("playAgain")) {
                gameFunction.playAgain(openid);
                return;
            }

        } catch (Exception e) {
            log.error("", e);
        }
        // TextWebSocketFrame tws = new TextWebSocketFrame(new Date().toString()
        // + ctx.channel().id() + ":" + request);
        // // 群發
        // Global.group.writeAndFlush(tws);
        // 返回【誰發的發給誰】
        // ctx.channel().writeAndFlush(tws);
    }

    private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) {
        if (!req.getDecoderResult().isSuccess() || (!"websocket".equals(req.headers().get("Upgrade")))) {
            sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.BAD_REQUEST));
            return;
        }
        // 註意,這條地址別被誤導了,其實這裏填寫什麽都無所謂,WS協議消息的接收不受這裏控制
        // 消息分發可以通過Req中獲取uri處理
        // WebSocketServerHandshakerFactory wsFactory = new
        // WebSocketServerHandshakerFactory("ws://127.0.0.1:7397/websocket",
        // null,
        // false);
        WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory("", null, false);
        handshaker = wsFactory.newHandshaker(req);
        if (handshaker == null) {
            WebSocketServerHandshakerFactory.sendUnsupportedWebSocketVersionResponse(ctx.channel());
        } else {
            handshaker.handshake(ctx.channel(), req);
        }
    }

    private static void sendHttpResponse(ChannelHandlerContext ctx, FullHttpRequest req, DefaultFullHttpResponse res) {
        // 返回應答給客戶端
        if (res.getStatus().code() != 200) {
            ByteBuf buf = Unpooled.copiedBuffer(res.getStatus().toString(), CharsetUtil.UTF_8);
            res.content().writeBytes(buf);
            buf.release();
        }
        // 如果是非Keep-Alive,關閉連接
        ChannelFuture f = ctx.channel().writeAndFlush(res);
        if (!isKeepAlive(req) || res.getStatus().code() != 200) {
            f.addListener(ChannelFutureListener.CLOSE);
        }
    }

    private static boolean isKeepAlive(FullHttpRequest req) {
        return false;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }

    public static Map<String, ChannelHandlerContext> getWebSocketMap() {
        return webSocketMap;
    }

    public static int sendMsg(String id, Object msg) {
        try {
            String ret = JsonUtils.toJsonStringIgnoreNull(msg);
            ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id);
            if (socket != null) {
                log.info("給<{}>發送消息:{}", id, ret);
                socket.writeAndFlush(new TextWebSocketFrame(ret));
                return 1;
            } else {
                log.info("<{}>連接不存在,不處理", id);
            }
        } catch (Exception ex) {
            log.error("", ex);
        }
        return 0;
    }

    public static int sendMsg(String id, String msg) {
        try {
            ChannelHandlerContext socket = MyWebSocket2.getWebSocketMap().get(id);
            if (socket != null) {
                log.info("給<{}>發送消息:{}", id, msg);
                socket.writeAndFlush(new TextWebSocketFrame(msg));
                return 1;
            } else {
                log.info("連接不存在,不處理");
            }
        } catch (Exception ex) {
            log.error("", ex);
        }
        return 0;
    }
}
package com.qmtt.server;

import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.stream.ChunkedWriteHandler;

public class ChildChannelHandler extends ChannelInitializer<SocketChannel> {
    @Override
    protected void initChannel(SocketChannel e) throws Exception {

        e.pipeline().addLast("http-codec", new HttpServerCodec());
        e.pipeline().addLast("aggregator", new HttpObjectAggregator(65536));
        e.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
        e.pipeline().addLast("handler", new MyWebSocket2());
    }
}

此代碼為詩詞榮耀websocket的實現,解決了tomcat實現的websocket連接不穩定的問題

Netty實現WebSocket