1. 程式人生 > >Netty學習之路(二)-非同步IO(NIO)程式設計

Netty學習之路(二)-非同步IO(NIO)程式設計

NIO到底是什麼簡稱?有人稱之為New I/O,原因為他相對於之前的I/O類庫來說是新增的。這是官方叫法。但是,由於之前老的I/O類庫是阻塞I/O,New I/O類庫的目標就是讓java支援非阻塞I/O,所以更多的人稱之為非阻塞I/O(Non-block I/O)。在開始進行NIO程式設計之前,先得了解幾個NIO類庫的概念:

  1. 通道(Channel):通道是對原 I/O 包中的流的模擬,可以通過它讀取和寫入資料。
    通道與流的不同之處在於,流只能在一個方向上移動(一個流必須是 InputStream 或者 OutputStream 的子類),而通道是雙向的,可以用於讀、寫或者同時用於讀寫。
    通道包括以下型別:
    FileChannel:從檔案中讀寫資料;
    DatagramChannel:通過 UDP 讀寫網路中資料;
    SocketChannel:通過 TCP 讀寫網路中資料;
    ServerSocketChannel:可以監聽新進來的 TCP 連線,對每一個新進來的連線都會創一個 SocketChannel。

  2. 緩衝區(Buffer):傳送給一個通道的所有資料都必須首先放到緩衝區中,同樣地,從通道中讀取的任何資料都要先讀到緩衝區中。也就是說,不會直接對通道進行讀寫資料,而是要先經過緩衝區。緩衝區實質上是一個數組,但它不僅僅是一個數組。緩衝區提供了對資料的結構化訪問,而且還可以跟蹤系統的讀/寫程序。緩衝區包括以下型別:
    ByteBuffer
    CharBuffer
    ShortBuffer
    IntBuffer
    LongBuffer
    FloatBuffer
    DoubleBuffer
    緩衝區狀態變數:
    capacity:最大容量;
    position:當前已經讀寫的位元組數;
    limit:還可以讀寫的位元組數。

  3. 選擇器(Selector):NIO 常常被叫做非阻塞 IO,主要是因為 NIO 在網路通訊中的非阻塞特性被廣泛使用。NIO 實現了 IO 多路複用中的 Reactor 模型,一個執行緒 Thread 使用一個選擇器 Selector 通過輪詢的方式去監聽多個通道 Channel 上的事件,從而讓一個執行緒就可以處理多個事件。通過配置監聽的通道 Channel 為非阻塞,那麼當 Channel 上的 IO 事件還未到達時,就不會進入阻塞狀態一直等待,而是繼續輪詢其它 Channel,找到 IO 事件已經到達的 Channel 執行。因為建立和切換執行緒的開銷很大,因此使用一個執行緒來處理多個事件而不是一個執行緒處理一個事件,對於 IO 密集型的應用具有很好地效能。應該注意的是,只有套接字 Channel 才能配置為非阻塞,而 FileChannel 不能,為 FileChannel 配置非阻塞也沒有意義。

程式設計實踐

NIOServer

package com.ph.NIO;


import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Create by PH on 2018/10/4 0004
 */
public class NIOServer {

    public static void main(String[] args) throws IOException {
        int port = 8080;
        if(args !=null && args.length>0) {
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e) {
                //採用預設值
            }
        }
        MultiNioServer server = new MultiNioServer(port);
        //啟用服務端執行緒
        new Thread(server, "server-01").start();
    }
}

class MultiNioServer implements Runnable {
    private Selector selector;
    private ServerSocketChannel ssChannel;
    private volatile boolean stop;

    /**
     * 初始化多路複用器,繫結監聽埠
     */
    public MultiNioServer(int port) {
        try {
            //獲得一個選擇器
            selector = Selector.open();
            //開啟ServerSocketChannel,用於監聽客戶端的連線,它是所有客戶端連線的父管道
            ssChannel = ServerSocketChannel.open();
            //設定為非阻塞
            ssChannel.configureBlocking(false);
            //將通道註冊到Reactor執行緒的多路複用器Selector上,監聽ACCEPT事件
            ssChannel.register(selector, SelectionKey.OP_ACCEPT);
            //獲得與通道關聯的服務端套接字
            ServerSocket serverSocket = ssChannel.socket();
            InetSocketAddress address = new InetSocketAddress("127.0.0.1", port);
            //繫結監聽埠
            serverSocket.bind(address);
        }catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void stop() {
        this.stop = true;
    }

    public void run() {
        while (!stop) {
            try {
                //輪詢註冊在其上的Channel,如果某個Channel上面發生讀或者寫事件,這個Channel就處於就緒狀態,會被輪詢出來
                selector.select(1000);
                //通過SelectionKey獲取就緒的Channel集合
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> keyIterator = keys.iterator();
                SelectionKey key = null;
                //遍歷就緒的Channel
                while (keyIterator.hasNext()) {
                    key = keyIterator.next();
                    keyIterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        e.printStackTrace();
                        if(key != null) {
                            key.cancel();
                            if(key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }catch (IOException e) {
                e.printStackTrace();
            }
        }

        if(selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()) {
            //處理新接入的請求訊息
            if (key.isAcceptable()) {
                System.out.println("New Client connect ...");
                //監聽到有新的客戶端接入
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                // 處理新的接入請求,完成TCP三次握手,建立物理鏈路,伺服器會為每個新連線建立一個 SocketChannel
                SocketChannel sc = ssc.accept();
                //設定客戶端鏈路為非阻塞模式
                sc.configureBlocking(false);
                // 將新接入的客戶端連線註冊到Reactor執行緒的多路複用器上,監聽讀操作,讀取客戶端傳送的網路訊息
                sc.register(selector, SelectionKey.OP_READ);
            }
            if (key.isReadable()) {
                SocketChannel sChannel = (SocketChannel) key.channel();
                System.out.println("Receive :" + readDataFromSocketChannel(sChannel));
                doWrite(sChannel, "Server receive succeed");
            }
        }
    }

    private String readDataFromSocketChannel(SocketChannel sChannel) throws IOException {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        StringBuilder data = new StringBuilder();
        while (true) {
            buffer.clear();
            int n = sChannel.read(buffer);
            //返回值大於0:讀到了位元組
            //返回值等於0:沒有讀取到位元組,屬於正常場景,忽略
            //返回值為-1: 鏈路已經關閉,需要關閉SocketChannel,釋放資源
            if (n == 0) {
                break;
            } else if(n < 0) {
                sChannel.close();
            }
            //將緩衝區當前的limit設定為position,position設定為0,用於後續對緩衝去的讀取操作。
            buffer.flip();
            //獲取緩衝區可讀位元組個數
            int limit = buffer.limit();
            char[] dst = new char[limit];
            for (int i = 0; i < limit; i++) {
                dst[i] = (char) buffer.get(i);
            }
            data.append(dst);
            buffer.clear();
        }
        return data.toString();
    }

    private void doWrite(SocketChannel sc, String response) throws IOException{
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //將資料寫入輸出緩衝區中
            writeBuffer.put(bytes);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if(!writeBuffer.hasRemaining()) {
                System.out.println("Send message succeed.");
            }
        }
    }
}

NIOClient

package com.ph.NIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

/**
 * Create by PH on 2018/10/4 0004
 */
public class NIOClient {
    public static void main(String[] args) throws IOException {
        int port = 8080;
        if(args !=null && args.length>0) {
            try {
                port = Integer.valueOf(args[0]);
            }catch (NumberFormatException e) {
                //採用預設值
            }
        }
        //啟用客戶端
        new Thread(new Client(null, port)).start();
    }
}

class Client implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    public Client(String host, int port ) {
        this.host = host == null? "127.0.0.1": host;
        this.port = port;
        try {
            selector = Selector.open();
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                selector.select(1000);
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = keys.iterator();
                SelectionKey key = null;
                while (iterator.hasNext()) {
                    key = iterator.next();
                    iterator.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                       if(key != null) {
                           key.cancel();
                           key.channel().close();
                       }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if(selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void doConnect() throws IOException{
        //如果連線成功則註冊讀事件,反之註冊連線事件
        if(socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            System.out.println("Connect to server succeed ...");
            doWrite(socketChannel);
        } else {
            System.out.println("Reconnect to server...");
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
        if(key.isValid()) {
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()) {
                //如果完成連線客戶端註冊讀事件,並向服務端傳送訊息
                if(sc.finishConnect()) {
                    System.out.println("Reconnect succeed ...");
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                }else {
                    //連線失敗,程序退出
                    System.exit(1);
                }
            } else if(key.isReadable()) {
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                StringBuilder data = new StringBuilder();
                while (true) {
                    buffer.clear();
                    int n = sc.read(buffer);
                    if(n > 0) {
                        buffer.flip();
                        //獲取緩衝區可讀位元組個數
                        int limit = buffer.limit();
                        char[] dst = new char[limit];
                        for (int i = 0; i < limit; i++) {
                            dst[i] = (char) buffer.get(i);
                        }
                        data.append(dst);
                        buffer.clear();
                        System.out.println("Receive : " + data.toString());
                    } else {
                        break;
                    }
                }
            }
        }
    }

    private void doWrite(SocketChannel sc) throws IOException {
        byte[] req = "QUERY MESSAGE".getBytes();
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        writeBuffer.put(req);
        writeBuffer.flip();
        sc.write(writeBuffer);
        if(!writeBuffer.hasRemaining()) {
            System.out.println("Send message succeed.");
        }
    }
}