1. 程式人生 > >3個netty5的例子,簡單介紹netty的用法

3個netty5的例子,簡單介紹netty的用法

這是一個netty快速入門的例子,也是我的學習筆記,比較簡單,翻譯於官方的文件整理後把所有程式碼註釋放在每一行程式碼中間,簡單明瞭地介紹一些基礎的用法。

   首頁這是基於netty5的例子,如果需要使用請依賴netty5的包。maven引用方式

1 <dependency>
2 <groupId>io.netty</groupId>
3 <artifactId>netty-all</artifactId>
4 <version>5.0.0.Alpha2</version>
5
</dependency>

0.Netty Server

package com.tjbsl.netty.demo0.server; import com.tjbsl.netty.demo3.time.TimeServerHandler; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /**  * 處理資料  */ public class NettyServer {     private int port;     public NettyServer(int port) {         this.port = port;     }     public void run() throws Exception {         /***          * NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器,          * Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。          * 在這個例子中我們實現了一個服務端的應用,          * 因此會有2個NioEventLoopGroup會被使用。          * 第一個經常被叫做‘boss’,用來接收進來的連線。          * 第二個經常被叫做‘worker’,用來處理已經被接收的連線,          * 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。          * 如何知道多少個執行緒已經被使用,如何對映到已經建立的Channels上都需要依賴於EventLoopGroup的實現,          * 並且可以通過建構函式來配置他們的關係。          */         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();         System.out.println("準備執行埠:" + port);         try {             /**              * ServerBootstrap 是一個啟動NIO服務的輔助啟動類              * 你可以在這個服務中直接使用Channel              */             ServerBootstrap b = new ServerBootstrap();             /**              * 這一步是必須的,如果沒有設定group將會報java.lang.IllegalStateException: group not set異常              */             b = b.group(bossGroup, workerGroup);             /***              * ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連線              * 這裡告訴Channel如何獲取新的連線.              */             b = b.channel(NioServerSocketChannel.class);             /***              * 這裡的事件處理類經常會被用來處理一個最近的已經接收的Channel。              * ChannelInitializer是一個特殊的處理類,              * 他的目的是幫助使用者配置一個新的Channel。              * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel              * 或者其對應的ChannelPipeline來實現你的網路程式。              * 當你的程式變的複雜時,可能你會增加更多的處理類到pipline上,              * 然後提取這些匿名類到最頂層的類上。              */             b = b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                    //ch.pipeline().addLast(new DiscardServerHandler());//demo1.discard                    //ch.pipeline().addLast(new ResponseServerHandler());//demo2.echo                    ch.pipeline().addLast(new TimeServerHandler());//demo3.time                 }             });             /***              * 你可以設定這裡指定的通道實現的配置引數。              * 我們正在寫一個TCP/IP的服務端,              * 因此我們被允許設定socket的引數選項比如tcpNoDelay和keepAlive。              * 請參考ChannelOption和詳細的ChannelConfig實現的介面文件以此可以對ChannelOptions的有一個大概的認識。              */             b = b.option(ChannelOption.SO_BACKLOG, 128);             /***              * option()是提供給NioServerSocketChannel用來接收進來的連線。              * childOption()是提供給由父管道ServerChannel接收到的連線,              * 在這個例子中也是NioServerSocketChannel。              */             b = b.childOption(ChannelOption.SO_KEEPALIVE, true);             /***              * 繫結埠並啟動去接收進來的連線              */             ChannelFuture f = b.bind(port).sync();             /**              * 這裡會一直等待,直到socket被關閉              */             f.channel().closeFuture().sync();         } finally {             /***              * 優雅關閉              */             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();         }     }     public static void main(String[] args) throws Exception {         int port;         if (args.length > 0) {             port = Integer.parseInt(args[0]);         } else {             port = 8000;         }         new NettyServer(port).run();         //通過cmd視窗的telnet 127.0.0.1 8000執行     } }

1.DISCARD服務(丟棄服務,指的是會忽略所有接收的資料的一種協議)

package com.tjbsl.netty.demo1.discard; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import io.netty.util.ReferenceCountUtil; /**  * 服務端處理通道.這裡只是列印一下請求的內容,並不對請求進行任何的響應  * DiscardServerHandler 繼承自 ChannelHandlerAdapter,  * 這個類實現了ChannelHandler介面,  * ChannelHandler提供了許多事件處理的介面方法,  * 然後你可以覆蓋這些方法。  * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。  *  */ public class DiscardServerHandler extends ChannelHandlerAdapter {     /***      * 這裡我們覆蓋了chanelRead()事件處理方法。      * 每當從客戶端收到新的資料時,      * 這個方法會在收到訊息時被呼叫,      * 這個例子中,收到的訊息的型別是ByteBuf      * @param ctx 通道處理的上下文資訊      * @param msg 接收的訊息      */     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) {         try {             ByteBuf in = (ByteBuf) msg;           /*  while (in.isReadable()) {                 System.out.print((char) in.readByte());                 System.out.flush();             }*/             //這一句和上面註釋的的效果都是列印輸入的字元             System.out.println(in.toString(CharsetUtil.US_ASCII));         }finally {             /**              * ByteBuf是一個引用計數物件,這個物件必須顯示地呼叫release()方法來釋放。              * 請記住處理器的職責是釋放所有傳遞到處理器的引用計數物件。              */             ReferenceCountUtil.release(msg);         }     }     /***      * 這個方法會在發生異常時觸發      * @param ctx      * @param cause      */     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         /***          * 發生異常後,關閉連線          */         cause.printStackTrace();         ctx.close();     } }

以上是一個丟棄服務的處理方式,你可以執行後通過telnet來發送訊息,來檢視是否正常執行,注意console裡會列印你的輸入內容。

2.ECHO服務(響應式協議)

    到目前為止,我們雖然接收到了資料,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應訊息給客戶端,這個協議針對任何接收的資料都會返回一個響應。

    和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的資料替代列印接收資料到控制檯上的邏輯。

說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。

package com.tjbsl.netty.demo2.echo; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; /**  * 服務端處理通道.  * ResponseServerHandler 繼承自 ChannelHandlerAdapter,  * 這個類實現了ChannelHandler介面,  * ChannelHandler提供了許多事件處理的介面方法,  * 然後你可以覆蓋這些方法。  * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。  * 用來對請求響應  */ public class ResponseServerHandler extends ChannelHandlerAdapter {     /**      * 這裡我們覆蓋了chanelRead()事件處理方法。      * 每當從客戶端收到新的資料時,      * 這個方法會在收到訊息時被呼叫,      *ChannelHandlerContext物件提供了許多操作,      * 使你能夠觸發各種各樣的I/O事件和操作。      * 這裡我們呼叫了write(Object)方法來逐字地把接受到的訊息寫入      * @param ctx 通道處理的上下文資訊      * @param msg 接收的訊息      */     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg) {          ByteBuf in = (ByteBuf) msg;          System.out.println(in.toString(CharsetUtil.UTF_8));         ctx.write(msg);         //cxt.writeAndFlush(msg)         //請注意,這裡我並不需要顯式的釋放,因為在進入的時候netty已經自動釋放         // ReferenceCountUtil.release(msg);     }     /**      * ctx.write(Object)方法不會使訊息寫入到通道上,      * 他被緩衝在了內部,你需要呼叫ctx.flush()方法來把緩衝區中資料強行輸出。      * 或者你可以在channelRead方法中用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的      * @param ctx      * @throws Exception      */     @Override     public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {         ctx.flush();     }     /**      * 這個方法會在發生異常時觸發      *      * @param ctx      * @param cause      */     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         /***          * 發生異常後,關閉連線          */         cause.printStackTrace();         ctx.close();     } }

同樣以上執行後,可以通過telnet傳送資料,console裡會打印出你傳送的資料,同時你的命令列介面裡應該也會接收到相同的資料。

3.TIME服務(時間協議的服務)

    在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完成時主動關閉連線。

    因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:

說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。

1 //ch.pipeline().addLast(new DiscardServerHandler());        
2 //ch.pipeline().addLast(new ResponseServerHandler());
3 ch.pipeline().addLast(new TimeServerHandler());

TimeServerHandler類的如下:

package com.tjbsl.netty.demo3.time; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import io.netty.util.CharsetUtil; import java.util.Scanner; public class TimeServerHandler extends ChannelHandlerAdapter {     /**      * channelActive()方法將會在連線被建立並且準備進行通訊時被呼叫。      * 因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。      *      * @param ctx      */     @Override     public void channelActive(final ChannelHandlerContext ctx) {         /*Scanner cin=new Scanner(System.in);         System.out.println("請輸入傳送資訊:");         String name=cin.nextLine();*/         String name="HelloWorld!";         /**          * 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。          * 因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。          * 通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator,          * 然後分配一個新的緩衝。          */         final ByteBuf time = ctx.alloc().buffer(4);         time.writeBytes(name.getBytes());         /***          * 和往常一樣我們需要編寫一個構建好的訊息          * 。但是等一等,flip在哪?難道我們使用NIO傳送訊息時不是呼叫java.nio.ByteBuffer.flip()嗎?          * ByteBuf之所以沒有這個方法因為有兩個指標,          * 一個對應讀操作一個對應寫操作。          * 當你向ByteBuf裡寫入資料的時候寫指標的索引就會增加,          * 同時讀指標的索引沒有變化。          * 讀指標索引和寫指標索引分別代表了訊息的開始和結束。          * 比較起來,NIO緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾,          * 除非你呼叫flip方法。          * 當你忘記呼叫flip方法而引起沒有資料或者錯誤資料被髮送時,          * 你會陷入困境。這樣的一個錯誤不會發生在Netty上,          * 因為我們對於不同的操作型別有不同的指標。          * 你會發現這樣的使用方法會讓你過程變得更加的容易,          * 因為你已經習慣一種沒有使用flip的方式。          * 另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture物件,          * 一個ChannelFuture代表了一個還沒有發生的I/O操作。          * 這意味著任何一個請求操作都不會馬上被執行,          * 因為在Netty裡所有的操作都是非同步的。          * 因此你需要在write()方法返回的ChannelFuture完成後呼叫close()方法,          * 然後當他的寫操作已經完成他會通知他的監聽者。          */         final ChannelFuture f = ctx.writeAndFlush(time); // (3)         /**          * 當一個寫請求已經完成是如何通知到我們?         * 這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。          * 這裡我們構建了一個匿名的ChannelFutureListener類用來在操作完成時關閉Channel。          */         f.addListener(new ChannelFutureListener() {             @Override             public void operationComplete(ChannelFuture future) {                 assert f == future;                 /***                  * 請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。                  */                 ctx.close();             }         });     }     //接收結果     @Override     public void channelRead(ChannelHandlerContext ctx, Object msg)             throws Exception {         ByteBuf buf = (ByteBuf) msg;         System.out.println("client:"+buf.toString(CharsetUtil.UTF_8));     }     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {         cause.printStackTrace();         ctx.close();     } }

4.Time客戶端

    不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。

    在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap和Channel的實現。

package com.tjbsl.netty.demo3.time.client; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; public class TimeClient {     public static void main(String[] args) throws Exception {         String host = "127.0.0.1";         int port =8000;         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             /**              * 如果你只指定了一個EventLoopGroup,              * 那他就會即作為一個‘boss’執行緒,              * 也會作為一個‘workder’執行緒,              * 儘管客戶端不需要使用到‘boss’執行緒。              */             Bootstrap b = new Bootstrap(); // (1)             b.group(workerGroup); // (2)             /**              * 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被建立時使用              */             b.channel(NioSocketChannel.class); // (3)             /**              * 不像在使用ServerBootstrap時需要用childOption()方法,              * 因為客戶端的SocketChannel沒有父channel的概念。              */             b.option(ChannelOption.SO_KEEPALIVE, true); // (4)             b.handler(new ChannelInitializer<SocketChannel>() {                 @Override                 public void initChannel(SocketChannel ch) throws Exception {                     ch.pipeline().addLast(new TimeClientHandler());                 }             });             //用connect()方法代替了bind()方法             ChannelFuture f = b.connect(host, port).sync();             //等到執行結束,關閉             f.channel().closeFuture().sync();         } finally {             workerGroup.shutdownGracefully();         }     } }