1. 程式人生 > >Springboot2(26)整合netty實現websocket通訊

Springboot2(26)整合netty實現websocket通訊

原始碼地址

springboot2教程系列
其它netty檔案有部落格

Springboot2(24)整合netty實現http服務(類似SpingMvc的contoller層實現)

Springboot2(25)整合netty實現檔案傳輸

Springboot2(26)整合netty實現websocket通訊

Springboot2(27)整合netty實現反向代理(內網穿透)

實現websocket通訊,和廣播訊息

新增依賴

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.1.Final</version>
</dependency>
<dependency>
    <groupId>commons-lang</groupId>
    <artifactId>commons-lang</artifactId>
    <version>${commons.lang.version}</version>
</dependency>

排除tomcat的依賴

Netty Http服務端編寫

handler 處理類

@Component
@Slf4j
@ChannelHandler.Sharable //@Sharable 註解用來說明ChannelHandler是否可以在多個channel直接共享使用
@ConditionalOnProperty(  //配置檔案屬性是否為true
		value = {"netty.ws.enabled"},
		matchIfMissing = false
)
public class WsServerHandler extends ChannelInboundHandlerAdapter
{ @Autowired NettyWsProperties nettyWsProperties; public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); private WebSocketServerHandshaker handshaker; //websocket握手升級繫結頁面 String wsFactoryUri = ""; @Value("${netty.ws.endPoint:/ws}") private String wsUri;
//static Set<Channel> channelSet = new HashSet<>(); /* * 握手建立 */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); channels.add(incoming); } /* * 握手取消 */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { Channel incoming = ctx.channel(); channels.remove(incoming); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof FullHttpRequest) { handleHttpRequest(ctx, (FullHttpRequest) msg); } else if (msg instanceof WebSocketFrame) { handlerWebSocketFrame(ctx, (WebSocketFrame) msg); } } //websocket訊息處理(只支援文字) public void handlerWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception { // 關閉請求 if (frame instanceof CloseWebSocketFrame) { handshaker.close(ctx.channel(), (CloseWebSocketFrame) frame.retain()); return; } // ping請求 if (frame instanceof PingWebSocketFrame) { ctx.channel().write(new PongWebSocketFrame(frame.content().retain())); return; } // 只支援文字格式,不支援二進位制訊息 if (frame instanceof TextWebSocketFrame) { //接收到的訊息 String requestmsg = ((TextWebSocketFrame) frame).text(); TextWebSocketFrame tws = new TextWebSocketFrame(requestmsg); channels.writeAndFlush(tws); } } // 第一次請求是http請求,請求頭包括ws的資訊 public void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest request) throws Exception { // 如果HTTP解碼失敗,返回HTTP異常 if (request instanceof HttpRequest) { HttpMethod method = request.getMethod(); // 如果是websocket請求就握手升級 if (wsUri.equalsIgnoreCase(request.getUri())) { System.out.println(" req instanceof HttpRequest"); WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory( wsFactoryUri, null, false); handshaker = wsFactory.newHandshaker(request); if (handshaker == null) { WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel()); } else { } handshaker.handshake(ctx.channel(), request); } } } // 異常處理,netty預設是關閉channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if (event.state() == IdleState.READER_IDLE) { // 讀資料超時 } else if (event.state() == IdleState.WRITER_IDLE) { // 寫資料超時 } else if (event.state() == IdleState.ALL_IDLE) { // 通道長時間沒有讀寫,服務端主動斷開連結 ctx.close(); } } else { super.userEventTriggered(ctx, evt); } } }

ChannelPipeline 實現

@Component
@ConditionalOnProperty(  //配置檔案屬性是否為true
		value = {"netty.ws.enabled"},
		matchIfMissing = false
)
public class WsPipeline  extends ChannelInitializer<SocketChannel>{
	
	@Autowired
	WsServerHandler wsServerHandler;
	
	private static final int READ_IDEL_TIME_OUT = 3; // 讀超時
	private static final int WRITE_IDEL_TIME_OUT = 4;// 寫超時
	private static final int ALL_IDEL_TIME_OUT = 5; // 所有超時
	
	@Override
	protected void initChannel(SocketChannel ch) throws Exception {

		ChannelPipeline p = ch.pipeline();
		p.addLast(new IdleStateHandler(READ_IDEL_TIME_OUT,WRITE_IDEL_TIME_OUT, ALL_IDEL_TIME_OUT, TimeUnit.MINUTES));
    	p.addLast("http-codec", new HttpServerCodec());
    	p.addLast("aggregator", new HttpObjectAggregator(65536));
    	p.addLast("http-chunked", new ChunkedWriteHandler());
    	p.addLast("handler",wsServerHandler);
	}

}

服務實現

@Configuration
@EnableConfigurationProperties({NettyWsProperties.class})
@ConditionalOnProperty(  //配置檔案屬性是否為true
		value = {"netty.ws.enabled"},
		matchIfMissing = false
)
@Slf4j
public class WsServer {
	
	    @Autowired
	    WsPipeline wsPipeline;

	    @Autowired
	    NettyWsProperties nettyWsProperties;
	    
	    @Bean("starWsServer")
	    public String start() {
	        // 準備配置
	        // HttpConfiguration.me().setContextPath(contextPath).setWebDir(webDir).config();
	        // 啟動伺服器
	       Thread thread =  new Thread(() -> {
	    	    NioEventLoopGroup bossGroup = new NioEventLoopGroup(nettyWsProperties.getBossThreads());
		        NioEventLoopGroup workerGroup = new NioEventLoopGroup(nettyWsProperties.getWorkThreads());
	            try {
	            	log.info("start netty [WebSocket] server ,port: " + nettyWsProperties.getPort());
	                ServerBootstrap boot = new ServerBootstrap();
	                options(boot).group(bossGroup, workerGroup)
	                        .channel(NioServerSocketChannel.class)
	                        .handler(new LoggingHandler(LogLevel.INFO))
	                        .childHandler(wsPipeline);
	                Channel ch = null;
	                //是否繫結IP
	                if(StringUtils.isNotEmpty(nettyWsProperties.getBindIp())){
	                	ch = boot.bind(nettyWsProperties.getBindIp(),nettyWsProperties.getPort()).sync().channel();
	                }else{
	                	ch = boot.bind(nettyWsProperties.getPort()).sync().channel();
	                }
	                ch.closeFuture().sync();
	            } catch (InterruptedException e) {
	                log.error("啟動NettyServer錯誤", e);
	            } finally {
	                bossGroup.shutdownGracefully();
	                workerGroup.shutdownGracefully();
	            }
	        });
	        thread.setName("Ws_Server");
	        thread.start();
	        return "ws start";
	    }
	    
	    
	    private ServerBootstrap options(ServerBootstrap boot) {
	    	boot.option(ChannelOption.SO_BACKLOG, 1024)
			    .option(ChannelOption.TCP_NODELAY, true)
			    .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
	        return boot;
	    }

}

啟動配置

---application.yml
spring.profiles.active: ws

---application-ws.yml
netty:
   ws:
     enabled: true
     port: 9988
     endPoint: /ws

測試

在瀏覽器開啟多個http://127.0.0.1:8080/socket.html

在這裡插入圖片描述