1. 程式人生 > >Netty從零開始(一)

Netty從零開始(一)

需要用到netty,之前就當年實習的時候用過Mina,netty沒用過,所以加急學習了一下,感覺還不錯,不多說,從官網入手,官網地址:http://netty.io/wiki/user-guide-for-4.x.html

有興趣的朋友可以自己去檢視。

前言:

問題

現在我們使用通用應用程式或包進行通訊。例如,我們經常使用HTTP客戶端庫從Web伺服器檢索資訊,並通過Web服務呼叫遠端過程呼叫。然而,通用協議或其實現有時不能很好地擴充套件。這就像我們不使用通用HTTP伺服器來交換大量檔案,電子郵件和近實時訊息,如財務資訊和多人遊戲資料。需要的是高度優化的協議實現,專門用於特殊目的。例如,您可能希望實現針對基於AJAX的聊天應用程式,媒體流或大型檔案傳輸進行了優化的HTTP伺服器。
你甚至可以設計和實施一個全新的協議,這個協議是根據你的需要而定製的。另一個不可避免的情況是當您必須處理舊版專有協議以確保與舊系統的互操作性。在這種情況下重要的是我們能夠快速實現該協議,而不會犧牲最終應用程式的穩定性和效能。

方案

Netty專案是為了快速開發可維護的高效能高可擴充套件性協議伺服器和客戶端而努力提供非同步事件驅動的網路應用程式框架和工具。換句話說,Netty是一個NIO客戶端伺服器框架,可以快速輕鬆地開發諸如協議伺服器和客戶端之類的網路應用程式。它大大簡化了網路程式設計流程,如TCP和UDP套接字伺服器開發。
“快速和容易”並不意味著由此產生的應用程式將遭受可維護性或效能問題的困擾。Netty經過精心設計,實現了許多協議,如FTP,SMTP,HTTP以及各種基於二進位制和基於文字的傳統協議。因此,Netty成功地找到了一種方法來實現輕鬆的開發,效能,穩定性和靈活性,而無需妥協。

有些使用者可能已經找到了聲稱具有相同優勢的其他網路應用程式框架,您可能想問問Netty與他們的區別。答案是它建立的哲學。Netty旨在為您提供API和執行方面最舒適的體驗,從第一天開始。這不是有形的東西,但你會意識到,這個哲學將使你的生活更容易,當你閱讀本指南和玩Netty的時候。
好了,以上就是關於netty的一個官網的初步介紹吧。下面進入搭建最簡單的伺服器的環節,我這裡會按照官網的思路走,不過不會完全一點不差。好了,我們開始:

建立專案

首先我們需要建立專案,如下圖所示:
專案名稱是NettyDemo,官網建議使用JDK1.6以上,我這裡使用的JDK1.8,然後加入使用maven匯入Netty依賴:
<dependencies>
    <!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.6.Final</version>
    </dependency>
</dependencies>
那麼現在我們可以正式開始我們的專案編寫了。

編寫一個Discard伺服器(按我理解就是啥也不幹的伺服器,彆著急反駁,往下看)

世界上最簡單的協議不是“hello world”,而是。。。。什麼也不做的協議Discard,丟棄的意思,服務端丟棄,那就是啥也不做的協議唄(嘗試把協議理解為使用者自定義功能)。 想要實現一個Discard協議,那麼我們唯一需要做的就是忽略所有接收到的資料。讓我們從處理器實現開始,它處理由netty生成的I/O事件。 首先我們建立一個java包:netty_beginner,然後在裡面建立一個類DiscardServerHandler 類的內容如下:
package netty_beginner;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;


/**
 * Created by moon on 2017/4/5.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
    @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { // (2)
//        super.channelRead(ctx, msg);
((ByteBuf) msg).release(); // (3)
//        ByteBuf in = (ByteBuf) msg;
//        try {
//            while (in.isReadable()) {
//                System.out.print((char) in.readByte());
//                System.out.flush();
//            }
//        } finally {
//            ReferenceCountUtil.release(msg);
//        }
}

    @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { // (5)
//        super.exceptionCaught(ctx, cause);
cause.printStackTrace();
        ctx.close();
    }
}

  1. 在這裡,我們重寫通道讀取channelRead()事件處理方法。每當從客戶端收到新資料時,都會使用接收到的訊息呼叫此方法。在這個例子中,接收到的訊息的型別是ByteBuf。
  2. 為了實現DISCARD協議,處理程式必須忽略收到的訊息。ByteBuf是一個引用計數物件,必須通過release()方法顯式釋放。請記住,處理程式有責任釋放傳遞給處理程式的引用計數物件。通常,channelRead()處理方法的實現方式如下:
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        try {
            // Do something with msg
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }
  3. 當由於I / O錯誤或由於在處理事件時丟擲異常而使得Netty丟擲異常時,exceptionCaught() 事件將會被Throwable丟擲。在大多數情況下,應該記錄捕獲到的異常,並在此關閉其關聯的通道,雖然這種方法的實現可以根據你想要處理的異常情況而有所不同。例如,您可能希望在關閉連線之前傳送帶有錯誤程式碼的響應訊息。
到目前位置一切順利。我們已經實現了DISCARD伺服器的前半部分。現在剩下的是寫入使用DiscardServerHandler啟動伺服器的main()方法。我們建立另外一個類:DiscardServer如下:
package netty_beginner;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

/**
 * Created by moon on 2017/4/5.
 */
public class DiscardServer {
    private int port;

    public DiscardServer(int port) {
        this.port = port;
    }

    public void run() throws InterruptedException {
        EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
                        @Override
public void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new DiscardServerHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)          // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
            // Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
            // Wait until the server socket is closed.
            // In this example, this does not happen, but you can do that to gracefully
            // shut down your server.
f.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new DiscardServer(port).run();
    }
}

  1. NioEventLoopGroup是一個處理I / O操作的多執行緒事件迴圈。Netty為不同型別的傳輸提供了各種EventLoopGroup實現。在這個例子中,我們正在實現一個伺服器端應用程式,因此將使用兩個NioEventLoopGroup。第一個,通常稱為“老闆”,接受傳入的連線。第二個,通常稱為“工人”,一旦老闆接受連線並將接受的連線註冊給工作人員,就處理接受的連線的流量。使用多少執行緒以及它們如何對映到建立的通道取決於EventLoopGroup實現,甚至可以通過建構函式進行配置。
  2. ServerBootstrap是一個幫助類,用於設定伺服器。您可以直接使用Channel設定伺服器。但是請注意,這是一個繁瑣的過程,在大多數情況下您不需要這樣做。
  3. 在這裡,我們指定使用NioServerSocketChannel類來例項化一個新的Channel來接受傳入的連線。(可以這麼理解,每個客戶端連線我們服務端,我們都會為他們建立一個channel,那麼這個channel對於面向物件的我們來說就是一個類,我們同意對於我們接受到的連線都初始化為:NioServerSocketChannel
  4. 這裡指定的處理程式將始終由新接受的Channel進行評估。ChannelInitializer是一個特殊的處理程式,旨在幫助使用者配置新的Channel。很可能您想通過新增一些處理程式(如DiscardServerHandler)來配置新Channel的ChannelPipeline來實現您的網路應用程式。隨著應用程式的複雜化,您可能會在管道中新增更多的處理程式,並將這個匿名類最終提取到頂級類中。(個人感覺說白了就是想自己實現包含自己處理邏輯的Channel,但是又需要包含一些通用的原有功能,咋辦,繼承唄,這就是為什麼上面的DiscardServerHandler繼承netty的類)
  5. 您還可以設定特定於Channel實現的引數。我們正在編寫一個TCP / IP伺服器,因此我們可以設定套接字選項,如tcpNoDelay和keepAlive。請參閱ChannelOption的apidocs和特定的ChannelConfig實現,以獲得有關支援的ChannelOptions的概述。
  6. 你有沒有注意到option()和childOption()?option()用於接受傳入連線的NioServerSocketChannel。childOption()用於在這種情況下由父級ServerChannel接受的通道,即NioServerSocketChannel。(我的理解就是前者用於配置我們父級Channel,後者用於配置我們自定義的子級Channel)。
  7. 我們現在準備好了。剩下的是繫結到埠並啟動伺服器。這裡,我們繫結機器中所有NIC(網路介面卡)的埠到8080。您現在可以根據需要呼叫bind()方法多次(具有不同的繫結地址)。

恭喜,到了現在這個階段我們已經完成了。下面可以進行嘗試,那麼在嘗試之前,我要說一句,這個例子非常好,就是一點比較費解,即使我開始測試,往本機8080埠傳送內容,我們根本看不出來是否成功,因為我們把內容忽略了 - -!。所以改一下,我們的DiscardServerHandler改成如下,列印收到的字元:

package netty_beginner;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;


/**
 * Created by moon on 2017/4/5.
 */
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
    @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//        super.channelRead(ctx, msg);
//        ((ByteBuf) msg).release();
ByteBuf in = (ByteBuf) msg;
        try {
            while (in.isReadable()) {
                System.out.print((char) in.readByte());
                System.out.flush();
            }
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

    @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//        super.exceptionCaught(ctx, cause);
cause.printStackTrace();
        ctx.close();
    }
}

然後我們啟動執行我們的main方法:


由於我用的win,不是linux,所以下面的操作可能有人不好使,因為有的win預設沒有啟動telnet,這個自己網上百度,很容易。我們開啟cmd,輸入telnet,進入一個新的視窗:



然後我們可以檢視幫助,輸入?/help,檢視win下的使用方式:
注意o,也就是open是我們所需的,我們使用命令:open localhost 8080如下圖:
這說明已經連上了,別跟我似的一直以為在連線ing。。。。。   那麼我們現在就可以聯絡了,由於我們的邏輯是一個字元一個字元輸出,所以我們輸入hello,在idea控制檯會挨個字元輸出:
那麼到這裡,說明我們的服務端小demo成功。

寫一個Echo伺服器

到目前為止,我們一直都在假設服務端是沒有響應的。然而,伺服器通常應該響應請求。讓我們學習如何通過實現ECHO協議向客戶端寫入響應訊息,其中任何接收到的資料都將被髮回。 與前面部分實現的Discard伺服器的唯一區別在於它將接收到的資料發回,而不是將接收的資料輸出到控制檯.因此,再次修改channelRead()方法就行了:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.write(msg); // (1)
ctx.flush(); // (2)
}

ChannelHandlerContext物件提供了各種可以觸發各種I / O事件和操作的操作。在這裡,我們呼叫write(Object)來逐字寫入接收到的訊息。請注意,我們沒有像DISCARD示例中那樣釋出接收的訊息。這是因為,當Netty釋出給電子郵件時,Netty會為您報告。
如果再次執行telnet命令,您將看到伺服器傳送回傳送給它的任何內容。(自行檢視)

編寫一個時間伺服器

本節中實現的協議是TIME協議。它與前面的示例不同之處在於,它傳送一個包含32位整數的訊息,而不接收任何請求,並在傳送訊息後關閉連線。在此示例中,您將學習如何構建和傳送訊息,並在完成時關閉連線。
因為我們不是將忽略任何接收到的資料,而是在建立連線後立即傳送訊息,這次我們不能使用channelRead()方法。相反,我們應該覆蓋channelActive()方法。所以我們建立一個新的類TimeServerHandler,以下是實現:
package netty_beginner;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Created by moon on 2017/4/5.
 */
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
    @Override
public void channelActive(final ChannelHandlerContext ctx) throws Exception {
        final ByteBuf time = ctx.alloc().buffer(4);
        time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));

        final ChannelFuture f = ctx.writeAndFlush(time);
        f.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture future) throws Exception {
                assert f == future;
                ctx.close();
            }
        });

    }

    @Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

  1. 如上所述,當建立連線並準備好傳送時,將呼叫channelActive()方法。我們來寫一個32位整數,表示這個方法當前的時間。
  2. 要傳送一條新訊息,我們需要分配一個包含訊息的新緩衝區。我們要寫一個32位整數,因此我們需要一個容量至少為4個位元組的ByteBuf。通過ChannelHandlerContext.alloc()獲取當前的ByteBufAllocator,並分配一個新的緩衝區。
  3. 像往常一樣,我們編寫構造的訊息。但等等,翻轉的地方在哪裡?在NIO傳送訊息之前,我們是否曾經呼叫過java.nio.ByteBuffer.flip()?ByteBuf沒有這樣的方法,因為它有兩個指標;一個用於讀操作,另一個用於寫操作。當你寫入東西到ByteBuf中,寫索引增加,而讀索引並沒有改變。讀者索引和作者索引分別表示訊息的開始和結束位置。相比之下,如果不呼叫flip方法,NIO緩衝區不能提供乾淨的方式來確定訊息內容的起始和結束位置。當您忘記翻轉緩衝區時,您將會遇到麻煩,因為你會發送不正確甚至是空的內容。在Netty中不會發生這樣的錯誤,因為我們針對不同的操作型別有不同的指標。你會發現它使你的生活更容易,當你習慣了 - 一個沒有翻轉的生活!另外要注意的是,ChannelHandlerContext.write()(和writeAndFlush())方法返回一個ChannelFuture。ChannelFuture表示尚未發生的I / O操作。這意味著任何請求的操作可能尚未執行,因為所有操作在Netty中都是非同步的。例如,即使在傳送訊息之前,以下程式碼也可能會關閉連線:
    Channel ch = ...;
    ch.writeAndFlush(message);
    ch.close();
    因此,您需要在ChannelFuture完成之後呼叫close()方法,該方法由write()方法返回,並且在寫入操作完成後通知其監聽器。請注意,close()也可能不會立即關閉連線,並且它返回ChannelFuture。
  4. 當寫請求完成時,我們如何得到通知?這就像將ChannelFutureListener新增到返回的ChannelFuture一樣簡單。在這裡,我們建立了一個新的匿名ChannelFutureListener,當操作完成時關閉通道。或者,您可以使用預定義的監聽器簡化程式碼:
    f.addListener(ChannelFutureListener.CLOSE);
要測試我們的時間伺服器是否按預期工作,可以使用UNIX rdate命令:
$ rdate -o <port> -p <host>

由於我的是win我就不測試了。 好了,這次就先到這裡,明天繼續接下來的內容。