1. 程式人生 > >netty實現TCP長連線

netty實現TCP長連線

所用jar包

netty-all-4.1.30.Final.jar 密碼:rzwe

NettyConfig.java,存放連線的客戶端

 1 import io.netty.channel.group.ChannelGroup;
 2 import io.netty.channel.group.DefaultChannelGroup;
 3 import io.netty.util.concurrent.GlobalEventExecutor;
 4 
 5 public class NettyConfig {
 6     
 7     /**
 8      * 儲存每一個客戶端接入進來時的channel物件
9 */ 10 public static ChannelGroup group = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE); 11 12 }

 

Server.java,netty配置資訊

 1 import io.netty.bootstrap.ServerBootstrap;
 2 import io.netty.channel.ChannelFuture;
 3 import io.netty.channel.ChannelInitializer;
 4 import
io.netty.channel.ChannelOption; 5 import io.netty.channel.ChannelPipeline; 6 import io.netty.channel.EventLoopGroup; 7 import io.netty.channel.nio.NioEventLoopGroup; 8 import io.netty.channel.socket.ServerSocketChannel; 9 import io.netty.channel.socket.SocketChannel; 10 import io.netty.channel.socket.nio.NioServerSocketChannel;
11 12 public class Server { 13 private int port; 14 private ServerSocketChannel serverSocketChannel; 15 16 public Server(int port){ 17 this.port = port; 18 bind(); 19 } 20 21 private void bind() { 22 Thread thread = new Thread(new Runnable() { 23 @Override 24 public void run() { 25 //服務端要建立兩個group,一個負責接收客戶端的連線,一個負責處理資料傳輸 26 //連線處理group 27 EventLoopGroup boss = new NioEventLoopGroup(); 28 //事件處理group 29 EventLoopGroup worker = new NioEventLoopGroup(); 30 ServerBootstrap bootstrap = new ServerBootstrap(); 31 // 繫結處理group 32 bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) 33 //保持連線數 34 .option(ChannelOption.SO_BACKLOG, 300) 35 //有資料立即傳送 36 .option(ChannelOption.TCP_NODELAY, true) 37 //保持連線 38 .childOption(ChannelOption.SO_KEEPALIVE, true) 39 //處理新連線 40 .childHandler(new ChannelInitializer<SocketChannel>() { 41 @Override 42 protected void initChannel(SocketChannel sc) throws Exception { 43 // 增加任務處理 44 ChannelPipeline p = sc.pipeline(); 45 p.addLast( 46 // //使用了netty自帶的編碼器和解碼器 47 // new StringDecoder(), 48 // new StringEncoder(), 49 //心跳檢測,讀超時,寫超時,讀寫超時 50 //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS), 51 //自定義的處理器 52 new ServerHandler()); 53 } 54 }); 55 56 //繫結埠,同步等待成功 57 ChannelFuture future; 58 try { 59 future = bootstrap.bind(port).sync(); 60 if (future.isSuccess()) { 61 serverSocketChannel = (ServerSocketChannel) future.channel(); 62 System.out.println("服務端啟動成功,埠:"+port); 63 } else { 64 System.out.println("服務端啟動失敗!"); 65 } 66 67 //等待服務監聽埠關閉,就是由於這裡會將執行緒阻塞,導致無法傳送資訊,所以我這裡開了執行緒 68 future.channel().closeFuture().sync(); 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } 72 finally { 73 //優雅地退出,釋放執行緒池資源 74 boss.shutdownGracefully(); 75 worker.shutdownGracefully(); 76 } 77 } 78 }); 79 thread.start(); 80 } 81 82 public void sendMessage(Object msg){ 83 if(serverSocketChannel != null){ 84 serverSocketChannel.writeAndFlush(msg); 85 } 86 } 87 88 public static void main(String[] args) { 89 Server server = new Server(8088); 90 } 91 }

 

ServerHandler.java,業務處理

 1 import io.netty.channel.ChannelHandlerContext;
 2 import io.netty.channel.ChannelInboundHandlerAdapter;
 3 
 4 public class ServerHandler extends ChannelInboundHandlerAdapter {    
 5       
 6     /**
 7      * 客戶端與服務端建立連線的時候呼叫
 8      */
 9     @Override
10     public void channelActive(ChannelHandlerContext ctx) throws Exception {
11         System.out.println("客戶端與服務端連線開始...");
12         NettyConfig.group.add(ctx.channel());
13     }
14  
15     /**
16      * 客戶端與服務端斷開連線時呼叫
17      */
18     @Override
19     public void channelInactive(ChannelHandlerContext ctx) throws Exception {
20         System.out.println("客戶端與服務端連線關閉...");
21         NettyConfig.group.remove(ctx.channel());
22     }
23  
24     /**
25      * 服務端接收客戶端傳送過來的資料結束之後呼叫
26      */
27     @Override
28     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
29         ctx.flush();
30         System.out.println("資訊接收完畢...");
31     }
32  
33     /**
34      * 工程出現異常的時候呼叫
35      */
36     @Override
37     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
38         cause.printStackTrace();
39         ctx.close();
40     }
41  
42     /**
43      * 服務端處理客戶端websocket請求的核心方法,這裡接收了客戶端發來的資訊
44      */
45     @Override
46     public void channelRead(ChannelHandlerContext channelHandlerContext, Object info) throws Exception {
47         System.out.println("接收到了:"+info);
48 ByteBuf buf = (ByteBuf) info;
49 byte[] req = new byte[buf.readableBytes()];
50 buf.readBytes(req);
51 String body = new String(req, "UTF-8");
52 System.out.println("接收客戶端資料:" + body);
53 ByteBuf pingMessage = Unpooled.buffer();
54 pingMessage.writeBytes(req);
55 channelHandlerContext.writeAndFlush(pingMessage);
56 
57         
58         //服務端使用這個就能向 每個連線上來的客戶端群發訊息
59         //NettyConfig.group.writeAndFlush(info);
60 //        Iterator<Channel> iterator = NettyConfig.group.iterator();
61 //        while(iterator.hasNext()){
62 //            //打印出所有客戶端的遠端地址
63 //            System.out.println((iterator.next()).remoteAddress());
64 //        }
65     }
66     
67     
68 }

 

使用網路除錯助手進行連線測試 下載地址

https://www.wanpishe.top/detail?blogId=fc62fce2-020a-4815-8388-0903e4a54e1f