1. 程式人生 > >netty快速入門3個例項

netty快速入門3個例項

是一個netty快速入門的例子,也是我的學習筆記,比較簡單,翻譯於官方的文件整理後把所有程式碼註釋放在每一行程式碼中間,簡單明瞭地介紹一些基礎的用法。

   首頁這是基於netty5的例子,如果需要使用請依賴netty5的包。maven引用方式

1 <dependency>
2 <groupId>io.netty</groupId>
3 <artifactId>netty-all</artifactId>
4 <version>5.0.0.Alpha2</version>
5 </
dependency>

或者去下載最新的jar下載頁面

1.DISCARD服務(丟棄服務,指的是會忽略所有接收的資料的一種協議)

001 import io.netty.bootstrap.ServerBootstrap;
002 import io.netty.channel.ChannelFuture;
003 import io.netty.channel.ChannelInitializer;
004 import io.netty.channel.ChannelOption;
005 import io.netty.channel.EventLoopGroup;
006 import io.netty.channel.nio.NioEventLoopGroup;
007 import io.netty.channel.socket.SocketChannel;
008 import io.netty.channel.socket.nio.NioServerSocketChannel;
009
010 /**
011 * 處理資料
012 */
013 public class NettyServer {
014 private int port;
015 public NettyServer(
int port) {
016 this.port = port;
017 }
018 public void run() throws Exception {
019 /***
020 * NioEventLoopGroup 是用來處理I/O操作的多執行緒事件迴圈器,
021 * Netty提供了許多不同的EventLoopGroup的實現用來處理不同傳輸協議。
022 * 在這個例子中我們實現了一個服務端的應用,
023 * 因此會有2個NioEventLoopGroup會被使用。
024 * 第一個經常被叫做‘boss’,用來接收進來的連線。
025 * 第二個經常被叫做‘worker’,用來處理已經被接收的連線,
026 * 一旦‘boss’接收到連線,就會把連線資訊註冊到‘worker’上。
027 * 如何知道多少個執行緒已經被使用,如何對映到已經建立的Channels上都需要依賴於EventLoopGroup的實現,
028 * 並且可以通過建構函式來配置他們的關係。
029 */
030 EventLoopGroup bossGroup = new NioEventLoopGroup();
031 EventLoopGroup workerGroup = new NioEventLoopGroup();
032 System.out.println("準備執行埠:" + port);
033 try {
034 /**
035 * ServerBootstrap 是一個啟動NIO服務的輔助啟動類
036 * 你可以在這個服務中直接使用Channel
037 */
038 ServerBootstrap b = new ServerBootstrap();
039 /**
040 * 這一步是必須的,如果沒有設定group將會報java.lang.IllegalStateException: group not set異常
041 */
042 b = b.group(bossGroup, workerGroup);
043 /***
044 * ServerSocketChannel以NIO的selector為基礎進行實現的,用來接收新的連線
045 * 這裡告訴Channel如何獲取新的連線.
046 */
047 b = b.channel(NioServerSocketChannel.class);
048 /***
049 * 這裡的事件處理類經常會被用來處理一個最近的已經接收的Channel。
050 * ChannelInitializer是一個特殊的處理類,
051 * 他的目的是幫助使用者配置一個新的Channel。
052 * 也許你想通過增加一些處理類比如NettyServerHandler來配置一個新的Channel
053 * 或者其對應的ChannelPipeline來實現你的網路程式。
054 * 當你的程式變的複雜時,可能你會增加更多的處理類到pipline上,
055 * 然後提取這些匿名類到最頂層的類上。
056 */
057 b = b.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
058 @Override
059 public void initChannel(SocketChannel ch) throws Exception {
060 ch.pipeline().addLast(new DiscardServerHandler());
061 //ch.pipeline().addLast(new ResponseServerHandler());
062 // ch.pipeline().addLast(new TimeServerHandler());
063 }
064 });
065 /***
066 * 你可以設定這裡指定的通道實現的配置引數。
067 * 我們正在寫一個TCP/IP的服務端,
068 * 因此我們被允許設定socket的引數選項比如tcpNoDelay和keepAlive。
069 * 請參考ChannelOption和詳細的ChannelConfig實現的介面文件以此可以對ChannelOptions的有一個大概的認識。
070 */
071 b = b.option(ChannelOption.SO_BACKLOG, 128);
072 /***
073 * option()是提供給NioServerSocketChannel用來接收進來的連線。
074 * childOption()是提供給由父管道ServerChannel接收到的連線,
075 * 在這個例子中也是NioServerSocketChannel。
076 */
077 b = b.childOption(ChannelOption.SO_KEEPALIVE, true);
078 /***
079 * 繫結埠並啟動去接收進來的連線
080 */
081 ChannelFuture f = b.bind(port).sync();
082 /**
083 * 這裡會一直等待,直到socket被關閉
084 */
085 f.channel().closeFuture().sync();
086 finally {
087 /***
088 * 優雅關閉
089 */
090 workerGroup.shutdownGracefully();
091 bossGroup.shutdownGracefully();
092 }
093 }
094
095 public static void main(String[] args) throws Exception {
096 int port;
097 if (args.length > 0) {
098 port = Integer.parseInt(args[0]);
099 else {
100 port = 8000;
101 }
102 new NettyServer(port).run();
103 }
104 }
01 import io.netty.buffer.ByteBuf;
02 import io.netty.channel.ChannelHandlerAdapter;
03 import io.netty.channel.ChannelHandlerContext;
04 import io.netty.util.CharsetUtil;
05 import io.netty.util.ReferenceCountUtil;
06
07 /**
08 * 服務端處理通道.這裡只是列印一下請求的內容,並不對請求進行任何的響應
09 * DiscardServerHandler 繼承自 ChannelHandlerAdapter,
10 * 這個類實現了ChannelHandler介面,
11 * ChannelHandler提供了許多事件處理的介面方法,
12 * 然後你可以覆蓋這些方法。
13 * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。
14 *
15 */
16 public class DiscardServerHandler extends ChannelHandlerAdapter {
17
18 /***
19 * 這裡我們覆蓋了chanelRead()事件處理方法。
20 * 每當從客戶端收到新的資料時,
21 * 這個方法會在收到訊息時被呼叫,
22 * 這個例子中,收到的訊息的型別是ByteBuf
23 * @param ctx 通道處理的上下文資訊
24 * @param msg 接收的訊息
25 */
26 @Override
27 public void channelRead(ChannelHandlerContext ctx, Object msg) {
28 try {
29 ByteBuf in = (ByteBuf) msg;
30 /*  while (in.isReadable()) {
31 System.out.print((char) in.readByte());
32 System.out.flush();
33 }*/
34 //這一句和上面註釋的的效果都是列印輸入的字元
35 System.out.println(in.toString(CharsetUtil.US_ASCII));
36 }finally {
37 /**
38 * ByteBuf是一個引用計數物件,這個物件必須顯示地呼叫release()方法來釋放。
39 * 請記住處理器的職責是釋放所有傳遞到處理器的引用計數物件。
40 */
41 ReferenceCountUtil.release(msg);
42 }
43 }
44
45 /***
46 * 這個方法會在發生異常時觸發
47 * @param ctx
48 * @param cause
49 */
50 @Override
51 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
52 /***
53 * 發生異常後,關閉連線
54 */
55 cause.printStackTrace();
56 ctx.close();
57 }
58
59 }

以上是一個丟棄服務的處理方式,你可以執行後通過telnet來發送訊息,來檢視是否正常執行,注意console裡會列印你的輸入內容。

2.ECHO服務(響應式協議)

    到目前為止,我們雖然接收到了資料,但沒有做任何的響應。然而一個服務端通常會對一個請求作出響應。讓我們學習怎樣在ECHO協議的實現下編寫一個響應訊息給客戶端,這個協議針對任何接收的資料都會返回一個響應。

    和discard server唯一不同的是把在此之前我們實現的channelRead()方法,返回所有的資料替代列印接收資料到控制檯上的邏輯。

說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。

1 //ch.pipeline().addLast(new DiscardServerHandler());        
2 ch.pipeline().addLast(new ResponseServerHandler());
3 //ch.pipeline().addLast(new TimeServerHandler());

下面是處理類ResponseServerHandler的程式碼

01 import io.netty.channel.ChannelHandlerAdapter;
02 import io.netty.channel.ChannelHandlerContext;
03
04 /**
05 * 服務端處理通道.
06 * ResponseServerHandler 繼承自 ChannelHandlerAdapter,
07 * 這個類實現了ChannelHandler介面,
08 * ChannelHandler提供了許多事件處理的介面方法,
09 * 然後你可以覆蓋這些方法。
10 * 現在僅僅只需要繼承ChannelHandlerAdapter類而不是你自己去實現介面方法。
11 * 用來對請求響應
12 */
13 public class ResponseServerHandler extends ChannelHandlerAdapter {
14
15 /**
16 * 這裡我們覆蓋了chanelRead()事件處理方法。
17 * 每當從客戶端收到新的資料時,
18 * 這個方法會在收到訊息時被呼叫,
19 *ChannelHandlerContext物件提供了許多操作,
20 * 使你能夠觸發各種各樣的I/O事件和操作。
21 * 這裡我們呼叫了write(Object)方法來逐字地把接受到的訊息寫入
22 * @param ctx 通道處理的上下文資訊
23 * @param msg 接收的訊息
24 */
25 @Override
26 public void channelRead(ChannelHandlerContext ctx, Object msg) {
27 ctx.write(msg);
28 //cxt.writeAndFlush(msg)
29
30 //請注意,這裡我並不需要顯式的釋放,因為在定入的時候netty已經自動釋放
31 // ReferenceCountUtil.release(msg);
32 }
33
34 /**
35 * ctx.write(Object)方法不會使訊息寫入到通道上,
36 * 他被緩衝在了內部,你需要呼叫ctx.flush()方法來把緩衝區中資料強行輸出。
37 * 或者你可以在channelRead方法中用更簡潔的cxt.writeAndFlush(msg)以達到同樣的目的
38 * @param ctx
39 * @throws Exception
40 */
41 @Override
42 public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
43 ctx.flush();
44 }
45
46 /**
47 * 這個方法會在發生異常時觸發
48 *
49 * @param ctx
50 * @param cause
51 */
52 @Override
53 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
54 /***
55 * 發生異常後,關閉連線
56 */
57 cause.printStackTrace();
58 ctx.close();
59 }
60
61 }

同樣以上執行後,可以通過telnet傳送資料,console裡會打印出你傳送的資料,同時你的命令列介面裡應該也會接收到相同的資料。

3.TIME服務(時間協議的服務)

    在這個部分被實現的協議是TIME協議。和之前的例子不同的是在不接受任何請求時他會發送一個含32位的整數的訊息,並且一旦訊息傳送就會立即關閉連線。在這個例子中,你會學習到如何構建和傳送一個訊息,然後在完成時主動關閉連線。

    因為我們將會忽略任何接收到的資料,而只是在連線被建立傳送一個訊息,所以這次我們不能使用channelRead()方法了,代替他的是,我們需要覆蓋channelActive()方法,下面的就是實現的內容:

說明NettyServer 還是用上面已經提供的類,只是把這段裡的登出部分修改成如下。

1 //ch.pipeline().addLast(new DiscardServerHandler());        
2 //ch.pipeline().addLast(new ResponseServerHandler());
3 ch.pipeline().addLast(new TimeServerHandler());

TimeServerHandler類的如下:

01 public class TimeServerHandler extends ChannelHandlerAdapter {
02
03 /**
04 * channelActive()方法將會在連線被建立並且準備進行通訊時被呼叫。
05 * 因此讓我們在這個方法裡完成一個代表當前時間的32位整數訊息的構建工作。
06 *
07 * @param ctx
08 */
09 @Override
10 public void channelActive(final ChannelHandlerContext ctx) {
11 /**
12 * 為了傳送一個新的訊息,我們需要分配一個包含這個訊息的新的緩衝。
13 * 因為我們需要寫入一個32位的整數,因此我們需要一個至少有4個位元組的ByteBuf。
14 * 通過ChannelHandlerContext.alloc()得到一個當前的ByteBufAllocator,
15 * 然後分配一個新的緩衝。
16 */
17 final ByteBuf time = ctx.alloc().buffer(4);
18 time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
19 /***
20 * 和往常一樣我們需要編寫一個構建好的訊息
21 * 。但是等一等,flip在哪?難道我們使用NIO傳送訊息時不是呼叫java.nio.ByteBuffer.flip()嗎?
22 * ByteBuf之所以沒有這個方法因為有兩個指標,
23 * 一個對應讀操作一個對應寫操作。
24 * 當你向ByteBuf裡寫入資料的時候寫指標的索引就會增加,
25 * 同時讀指標的索引沒有變化。
26 * 讀指標索引和寫指標索引分別代表了訊息的開始和結束。
27 * 比較起來,NIO緩衝並沒有提供一種簡潔的方式來計算出訊息內容的開始和結尾,
28 * 除非你呼叫flip方法。
29 * 當你忘記呼叫flip方法而引起沒有資料或者錯誤資料被髮送時,
30 * 你會陷入困境。這樣的一個錯誤不會發生在Netty上,
31 * 因為我們對於不同的操作型別有不同的指標。
32 * 你會發現這樣的使用方法會讓你過程變得更加的容易,
33 * 因為你已經習慣一種沒有使用flip的方式。
34 * 另外一個點需要注意的是ChannelHandlerContext.write()(和writeAndFlush())方法會返回一個ChannelFuture物件,
35 * 一個ChannelFuture代表了一個還沒有發生的I/O操作。
36 * 這意味著任何一個請求操作都不會馬上被執行,
37 * 因為在Netty裡所有的操作都是非同步的。
38 * 因此你需要在write()方法返回的ChannelFuture完成後呼叫close()方法,
39 * 然後當他的寫操作已經完成他會通知他的監聽者。
40 */
41 final ChannelFuture f = ctx.writeAndFlush(time); // (3)
42 /**
43 * 當一個寫請求已經完成是如何通知到我們?
44 * 這個只需要簡單地在返回的ChannelFuture上增加一個ChannelFutureListener。
45 * 這裡我們構建了一個匿名的ChannelFutureListener類用來在操作完成時關閉Channel。
46 */
47 f.addListener(new ChannelFutureListener() {
48 @Override
49 public void operationComplete(ChannelFuture future) {
50 assert f == future;
51 /***
52 * 請注意,close()方法也可能不會立馬關閉,他也會返回一個ChannelFuture。
53 */
54 ctx.close();
55 }
56 });
57 }
58 @Override
59 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
60 cause.printStackTrace();
61 ctx.close();
62 }
63 }

4.Time客戶端

    不像DISCARD和ECHO的服務端,對於TIME協議我們需要一個客戶端因為人們不能把一個32位的二進位制資料翻譯成一個日期或者日曆。在這一部分,我們將會討論如何確保服務端是正常工作的,並且學習怎樣用Netty編寫一個客戶端。

    在Netty中,編寫服務端和客戶端最大的並且唯一不同的使用了不同的BootStrap和Channel的實現。

01 public class TimeClient {
02 public static void main(String[] args) throws Exception {
03 String host = "127.0.0.1";
04 int port =8000;
05 EventLoopGroup workerGroup = new NioEventLoopGroup();
06 try {
07 /**
08 * 如果你只指定了一個EventLoopGroup,
09 * 那他就會即作為一個‘boss’執行緒,
10 * 也會作為一個‘workder’執行緒,
11 * 儘管客戶端不需要使用到‘boss’執行緒。
12 */
13 Bootstrap b = new Bootstrap(); // (1)
14 b.group(workerGroup); // (2)
15 /**
16 * 代替NioServerSocketChannel的是NioSocketChannel,這個類在客戶端channel被建立時使用
17 */
18 b.channel(NioSocketChannel.class); // (3)
19 /**
20 * 不像在使用ServerBootstrap時需要用childOption()方法,
21 * 因為客戶端的SocketChannel沒有父channel的概念。
22 */
23 b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
24 b.handler(new ChannelInitializer<SocketChannel>() {
25 @Override
26 public void initChannel(SocketChannel ch) throws Exception {
27 ch.pipeline().addLast(new TimeClientHandler());
28 }
29 });
30 //用connect()方法代替了bind()方法
31 ChannelFuture f = b.connect(host, port).sync();
32 //等到執行結束,關閉