1. 程式人生 > >Java NIO學習筆記(四) 使用JDK 1.7 NIO2.0 實現客戶端與伺服器的通訊

Java NIO學習筆記(四) 使用JDK 1.7 NIO2.0 實現客戶端與伺服器的通訊

JDK1.7 提供了全新的非同步NIO模式。稱為:NIO2.0或AIO。該模式引入了新的非同步通道的概念,並提供了非同步檔案通道和非同步套接字通道的實現。非同步通道提供兩種方式獲取獲取操作結果。分別是:

  • 通過java.util.concurrent.Future類來表示非同步操作的結果;
  • CompletionHandler介面的實現類作為操作完成的回撥。

NIO2.0的非同步套接字通道是真正的非同步非阻塞I/O,它對應UNIX網路程式設計中的事件驅動I/O(AIO),它不需要通過多路複用器(Selector)對註冊的通道進行輪詢操作即可實現非同步讀寫,從而簡化了NIO的程式設計模型。

回撥模式

AIO的回撥模式的服務端程式碼:

public class SocketServiceCb {

    AsynchronousServerSocketChannel asynchronousServerSocketChannel;
    CountDownLatch latch;

    public void start() {

        try {

            asynchronousServerSocketChannel = AsynchronousServerSocketChannel.open();
            asynchronousServerSocketChannel.bind(new
InetSocketAddress("127.0.0.1", 17777)); } catch (IOException e) { e.printStackTrace(); } latch = new CountDownLatch(1); doAccept(); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } private
void doAccept() { asynchronousServerSocketChannel.accept(this, new AccessCompleteHandler()); } class AccessCompleteHandler implements CompletionHandler<AsynchronousSocketChannel, SocketServiceCb> { @Override public void completed(AsynchronousSocketChannel result, SocketServiceCb attachment) { // 當我們呼叫AsynchronousServerSocketChannel的accept方法後,如果有新的客戶端連線接入, // 系統將回調我們傳入的CompletionHandler例項的completed方法,表示新的客戶端已經接入成功, // 因為一個AsynchronousServerSocket Channel可以接收成千上萬個客戶端, // 所以我們需要繼續呼叫它的accept方法,接收其他的客戶端連線, // 最終形成一個迴圈。每當接收一個客戶讀連線成功之後,再非同步接收新的客戶端連線。 attachment.asynchronousServerSocketChannel.accept(attachment, this); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.flip(); //引數1 ByteBuffer dst:接收緩衝區,用於從非同步Channel中讀取資料包; //引數2 A attachment:非同步Channel攜帶的附件,通知回撥的時候作為入參使用。即回撥方法的第二個引數 //引數3 CompletionHandler<Integer,? super A>:接收通知回撥的業務handler,本例程中為ReadCompletionHandler。 result.read(byteBuffer, byteBuffer, new ReadCompleteHandler(result)); } @Override public void failed(Throwable exc, SocketServiceCb attachment) { } } class ReadCompleteHandler implements CompletionHandler<Integer, ByteBuffer> { private AsynchronousSocketChannel channel; public ReadCompleteHandler(AsynchronousSocketChannel result) { this.channel = result; } @Override public void completed(Integer result, ByteBuffer attachment) { //flip操作,為後續從緩衝區讀取資料做準備 System.out.print("資料大小為:"+attachment.remaining()); byte[] bytes = new byte[attachment.remaining()]; attachment.get(bytes); try { System.out.print("伺服器接收:" + new String(bytes, "UTF-8")); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } doWrite("12312"); } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { this.channel.close(); } catch (IOException e) { e.printStackTrace(); } } private void doWrite(String data) { ByteBuffer byteBuffer = ByteBuffer.wrap(data.getBytes()); byteBuffer.flip(); channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer attachment) { if (attachment.hasRemaining()) { channel.write(attachment, attachment, this); }else { latch.countDown(); } } @Override public void failed(Throwable exc, ByteBuffer attachment) { try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } }

CompletionHandler介面即為回撥,它有兩個方法,執行成功的回撥和異常回調,分別如下。

  • public void completed(AsynchronousSocketChannel result, AsyncTimeServerHandler attachment);
  • public void failed(Throwable exc, AsyncTimeServerHandler attachment)。

AIO的回撥模式的客戶端程式碼:

CountDownLatch latch;

    @Test
    public void testNIOCallBack() throws IOException {
        final AsynchronousSocketChannel channel = AsynchronousSocketChannel.open();

        latch = new CountDownLatch(1);

        channel.connect(new InetSocketAddress("127.0.0.1", 17777),null, new CompletionHandler() {

            @Override
            public void completed(Object result, Object attachment) {
                System.out.print("連結成功");
                //final ByteBuffer byteBuffer = ByteBuffer.wrap("12312414".getBytes());
                byte[] bytes = "1231231231".getBytes();
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                byteBuffer.put(bytes);

                byteBuffer.flip();

                channel.write(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {

                    @Override
                    public void completed(Integer result, ByteBuffer attachment) {
                        if(attachment.hasRemaining()){
                            channel.write(attachment, attachment, this);
                        }else {
                            System.out.print("傳送成功");
                            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                            channel.read(readBuffer, readBuffer, new CompletionHandler<Integer, ByteBuffer>() {
                                @Override
                                public void completed(Integer result, ByteBuffer attachment) {

                                    byte[] bytes = new byte[attachment.remaining()];
                                    attachment.get(bytes);

                                    System.out.print("接受資訊:"+new String(bytes));

                                    latch.countDown();
                                }

                                @Override
                                public void failed(Throwable exc, ByteBuffer attachment) {
                                    exc.printStackTrace();

                                }
                            });
                        }
                    }

                    @Override
                    public void failed(Throwable exc, ByteBuffer attachment) {
                        exc.printStackTrace();

                    }
                });
            }

            @Override
            public void failed(Throwable exc, Object attachment) {
              exc.printStackTrace();
            }
        });

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

JDK底層通過執行緒池ThreadPoolExecutor來執行回撥通知,非同步回撥通知類由sun.nio.ch.AsynchronousChannelGroupImpl實現,它經過層層呼叫,最終回撥com.phei.netty.aio.AsyncTimeClientHandler$1.completed方法,完成回撥通知。由此我們也可以得出結論:非同步Socket Channel是被動執行物件,我們不需要像NIO程式設計那樣建立一個獨立的I/O執行緒來處理讀寫操作。對於AsynchronousServerSocket Channel和AsynchronousSocketChannel,它們都由JDK底層的執行緒池負責回撥並驅動讀寫操作。

Future模式

Future模式伺服器端的程式碼

public class SocketServiceAIO {

    private static ExecutorService executorService;
    private static AsynchronousServerSocketChannel serverSocketChannel;

    static class ChannelWorker implements Callable<String> {
        private CharBuffer charBuffer;
        private CharsetDecoder decoder = Charset.defaultCharset().newDecoder();
        private AsynchronousSocketChannel channel;

        ChannelWorker(AsynchronousSocketChannel channel) {
            this.channel = channel;
        }

        @Override
        public String call() throws Exception {

            final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

            //讀取請求
            while (channel.read(byteBuffer).get() != -1) {
                byteBuffer.flip();
                charBuffer = decoder.decode(byteBuffer);
                String request = charBuffer.toString().trim();
                System.out.println("客戶端請求:" + request);

                ByteBuffer outByteBuffer = ByteBuffer.wrap("請求收到".getBytes());

                Future future = channel.write(outByteBuffer);

                future.get();

                if (byteBuffer.hasRemaining()) {
                    byteBuffer.compact();
                } else {
                    byteBuffer.clear();
                }

            }
            channel.close();
            return "OK";
        }
    }

    private static void init() throws IOException {

        executorService = Executors.newCachedThreadPool(Executors.defaultThreadFactory());
        serverSocketChannel = AsynchronousServerSocketChannel.open();

        if (serverSocketChannel.isOpen()) {
            serverSocketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
            serverSocketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

            serverSocketChannel.bind(new InetSocketAddress("127.0.0.1", 17777));
        } else {
            throw new RuntimeException("通道未開啟");
        }

    }

    private static void start() {

        System.out.println("等待客戶端請求...");

        while (true) {
            //接收客戶端請求
            Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();

            try {

                //獲取請求
                AsynchronousSocketChannel channel = future.get();
                //提交給執行緒池
                executorService.submit(new ChannelWorker(channel));
            } catch (Exception ex) {
                ex.printStackTrace();
                System.err.println("伺服器關閉");
                executorService.shutdown();

                while (!executorService.isTerminated()) {

                }
                break;

            }
        }
    }

    public static void startTCPService() {
        try {
            init();
        } catch (IOException e) {
            e.printStackTrace();
            System.out.println("AIO初始化失敗");
        }


        try {
            start();

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("AIO初始化失敗");
        }
    }
}

Future客戶端程式碼:


 @Test
    public void test() {

        try {
            start();

        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("AIO初始化失敗");
        }
    }


    private void start() throws IOException, ExecutionException, InterruptedException {
        AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();

        if (socketChannel.isOpen()) {
            socketChannel.setOption(StandardSocketOptions.SO_RCVBUF, 4 * 1024);
            socketChannel.setOption(StandardSocketOptions.SO_REUSEADDR, true);

            Future future = socketChannel.connect(new InetSocketAddress("127.0.0.1", 17777));

            Object connect = future.get();

            if(connect != null){
                throw new RuntimeException("連結失敗");
            }

        } else {
            throw new RuntimeException("通道未開啟");
        }

        //傳送資料

        Future future = socketChannel.write(ByteBuffer.wrap("我是客戶端".getBytes()));

        future.get();

        //讀取伺服器的傳送的資料
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //讀取伺服器
        while(socketChannel.read(byteBuffer).get() != -1){
            byteBuffer.flip();

            CharBuffer charBuffer = Charset.defaultCharset().newDecoder().decode(byteBuffer);

            System.out.println("伺服器說:"+charBuffer.toString().trim());
            byteBuffer.clear();
        }
    }

至此,我們學習了在java中幾種不同的網路程式設計模式。這裡總是一下:

1.非同步非阻塞I/O

即本節所述的JDK 1.7 AIO。很多人喜歡將JDK1.4提供的NIO框架稱為非同步非阻塞I/O,但是,如果嚴格按照UNIX網路程式設計模型和JDK的實現進行區分,實際上它只能被稱為非阻塞I/O,不能叫非同步非阻塞I/O。因為它是基於Selctor的select/poll模型實現,它是基於I/O複用技術的非阻塞I/O,不是非同步I/O。在JDK1.5 update10和Linux core2.6以上版本,Sun優化了Selctor的實現,它在底層使用epoll替換了select/poll,上層的API並沒有變化,可以認為是JDK NIO的一次效能優化,但是它仍舊沒有改變I/O的模型。

由JDK1.7提供的NIO2.0,新增了非同步的套接字通道,它是真正的非同步I/O,在非同步I/O操作的時候可以傳遞訊號變數,當操作完成之後會回撥相關的方法,非同步I/O也被稱為AIO。

2.多路複用器/選擇器 Selector

在前面的章節我們介紹過Java NIO的實現關鍵是多路複用I/O技術,多路複用的核心就是通過Selector來輪詢註冊在其上的Channel,當發現某個或者多個Channel處於就緒狀態後,從阻塞狀態返回就緒的Channel的選擇鍵集合,進行I/O操作。由於多路複用器是NIO實現非阻塞I/O的關鍵

3.偽非同步I/O

偽非同步I/O的概念完全來源於實踐。在JDK NIO程式設計沒有流行之前,為了解決Tomcat通訊執行緒同步I/O導致業務執行緒被掛住的問題,大家想到了一個辦法:在通訊執行緒和業務執行緒之間做個緩衝區,這個緩衝區用於隔離I/O執行緒和業務執行緒間的直接訪問,這樣業務執行緒就不會被I/O執行緒阻塞。而對於後端的業務側來說,將訊息或者Task放到執行緒池後就返回了,它不再直接訪問I/O執行緒或者進行I/O讀寫,這樣也就不會被同步阻塞。像這樣通過執行緒池做緩衝區的做法來解決一連線一執行緒問題,習慣於稱它為偽非同步I/O,而官方並沒有偽非同步I/O這種說法,請大家注意。

4.同步阻塞IO

即最簡單,也最好理解,一個Socket連結,開啟一個執行緒去處理。

這裡寫圖片描述

如何選擇

如果客戶端併發連線數不多,伺服器的負載也不重,那就完全沒必要選擇NIO做服務端,畢竟非阻塞的處理IO是比阻塞IO的響應時間要慢一些(涉及多執行緒的上下文切換等問題);如果是相反情況,那就要考慮選擇合適的NIO框架進行開發。但並建議直接利用JDK的NIO來開發。

開發出高質量的NIO程式並不是一件簡單的事情,除去NIO固有的複雜性和BUG不談,作為一個NIO服務端,需要能夠處理網路的閃斷、客戶端的重複接入、客戶端的安全認證、訊息的編解碼、半包讀寫、網路擁塞等情況。由於NIO還涉及到Reactor模式,如果你沒有足夠的NIO網路程式設計和多執行緒程式設計經驗積累,一個NIO框架的穩定往往需要半年甚至更長的時間。更為糟糕的是,一旦在生產環境中發生問題,往往會導致跨節點的服務呼叫中斷,嚴重的可能會導致整個叢集環境都不可用,需要重啟伺服器,這種非正常停機會帶來巨大的損失。

從可維護性角度看,由於NIO採用了非同步非阻塞程式設計模型,而且是一個I/O執行緒處理多條鏈路,它的除錯和跟蹤非常麻煩,特別是生產環境中的問題,我們無法進行有效的除錯和跟蹤,往往只能靠一些日誌來輔助分析,定位難度很大。

由於上述原因,在大多數場景下,不建議大家直接使用JDK的NIO類庫,除非你精通NIO程式設計或者有特殊的需求。在絕大多數的業務場景中,我們可以使用NIO框架Netty來進行NIO程式設計,它既可以作為客戶端也可以作為服務端,同時支援UDP和非同步檔案傳輸,功能非常強大。

後續筆者將記錄Netty的學習筆記