1. 程式人生 > >【NIO詳解】基於NIO的client與server

【NIO詳解】基於NIO的client與server

服務端程式碼

public class Server_Test {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                port = 8080
; } } MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port); new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start(); } } class MultiplexerTimeServer implements Runnable { private Selector selector; private ServerSocketChannel servChannel; private
volatile boolean stop; //初始化多路複用器,繫結監聽埠 public MultiplexerTimeServer(int port) { try { //開啟多路複用器 selector = Selector.open(); //開啟伺服器通道 servChannel = ServerSocketChannel.open(); //設定伺服器通道為非阻塞模式 servChannel.configureBlocking(false
); //繫結埠,backlog指佇列的容量,提供了容量限制的功能,避免太多客戶端佔用太多伺服器資源 //serverSocketChannel有一個佇列,存放沒有來得及處理的客戶端,伺服器每次accept,就會從佇列中去一個元素。 servChannel.socket().bind(new InetSocketAddress(port), 1024); //把伺服器通道註冊到多路複用器上,並監聽阻塞事件 servChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("The server is start in port: " + port); } catch (IOException e) { e.printStackTrace(); System.exit(1); } } public void stop() { this.stop = true; } @Override public void run() { while (!stop) { try { //多路複用器開始工作(輪詢),選擇已就緒的通道 //等待某個通道準備就緒時最多阻塞1秒,若超時則返回。 selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (IOException e) { if (key != null) { key.cancel(); if (key.channel() != null) { key.channel().close(); } } } } } catch (IOException e) { e.printStackTrace(); } } //多路複用器關閉後,所有註冊在上面的Channel和Pipe等資源都會自動去註冊並關閉 //所以不需要重複釋放資源 if (selector != null) { try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { //獲取伺服器通道 ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); //執行阻塞方法(等待客戶端資源) SocketChannel sc = ssc.accept(); //設定為非阻塞模式 sc.configureBlocking(false); //註冊到多路複用器上,並設定為可讀狀態 sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { //讀取資料 SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { //反轉緩衝區(復位) readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; //接受緩衝區資料 readBuffer.get(bytes); //trim方法返回字串的副本,忽略前導空白和尾部空白 String body = new String(bytes).trim(); // String body = new String(bytes, "UTF-8"); System.out.println("The time server receive order : " + body); // String currentTime = "QUERY TIME ORDER" // .equalsIgnoreCase(body) ? new java.util.Date // (System.currentTimeMillis()).toString() // : "BAD ORDER"; String currentTime = new Date(System.currentTimeMillis()).toString(); //給客戶端回寫資料 doWrite(sc, currentTime); } else if (readBytes < 0) { //對端鏈路關閉 key.cancel(); sc.close(); } } } } private void doWrite(SocketChannel channel, String response) throws IOException{ if (response != null && response.trim().length() > 0) { System.out.println(response); byte[] bytes = response.getBytes(); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } } }

程式看煩了吧!來看看這張圖幫助你理解程式

服務端通訊序列圖:

這裡寫圖片描述

其實,主要步驟可以分為

  • Selector.open():開啟一個Selector。
  • ServerSocketChannel.open():建立服務端的Channel。
  • bind():繫結到某個埠上。並配置非阻塞模式。
  • register():註冊Channel和關注的事件到Selector上。
  • select():拿到已經就緒的事件。

客戶端程式碼

public class Client_Test {
    public static void main(String[] args) throws UnknownHostException {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                port = 8080;
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port), "Time-Client-001").start();
    }
}
class TimeClientHandle implements Runnable{
    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    //預設boolean值為false
    private volatile boolean stop;

    public TimeClientHandle(String host, int port) {
        //host若為空,則設定為指定ip
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            //開啟多路複用器
            selector = Selector.open();
            //開啟管道
            socketChannel = SocketChannel.open();
            //設定管道為非阻塞模式
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    @Override
    public void run() {
        try {
            doConnect();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop) {
            try {
                //阻塞等待1s,若超時則返回
                selector.select(1000);
                //獲取所有selectionkey
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                //遍歷所有selectionkey
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    //獲取之後刪除
                    it.remove();
                    try {
                        //處理該selectionkey
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            //取消selectionkey
                            key.cancel();
                            if (key.channel() != null) {
                                //關閉該通道
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
        if (selector != null) {
            try {
                //關閉多路複用器
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    public void handleInput(SelectionKey key) throws IOException{
        //若該selectorkey可用
        if (key.isValid()) {
            //將key轉型為SocketChannel
            SocketChannel sc = (SocketChannel) key.channel();
            //判斷是否連線成功
            if (key.isConnectable()) {
                //若已經建立連線
                if (sc.finishConnect()) {
                    //向多路複用器註冊可讀事件
                    sc.register(selector, SelectionKey.OP_READ);
                    //向管道寫資料
                    doWrite(sc);
                }else {
                    //連線失敗 程序退出
                    System.exit(1);
                }
            }

            //若是可讀的事件
            if (key.isReadable()) {
                //建立一個緩衝區
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                System.out.println("before  :  "+readBuffer);
                //從管道中讀取資料然後寫入緩衝區中
                int readBytes = sc.read(readBuffer);
                System.out.println("after :  "+readBuffer);
                //若有資料
                if (readBytes > 0) {
                    //反轉緩衝區
                    readBuffer.flip();
                    System.out.println(readBuffer);

                    byte[] bytes = new byte[readBuffer.remaining()];
                    //獲取緩衝區並寫入位元組陣列中
                    readBuffer.get(bytes);
                    //將位元組陣列轉換為String型別
                    String body = new String(bytes);
                    System.out.println(body.length());
                    System.out.println("Now is : " + body + "!");
                    this.stop = true;
                } else if (readBytes < 0) {
                    key.cancel();
                    sc.close();
                } else {
                    sc.register(selector, SelectionKey.OP_READ);
                }
            }
        }
    }
    public void doConnect() throws IOException {
        //通過ip和埠號連線到伺服器
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            //向多路複用器註冊可讀事件
            socketChannel.register(selector, SelectionKey.OP_READ);
            //向管道寫資料
            doWrite(socketChannel);
        } else {
            //若連線伺服器失敗,則向多路複用器註冊連線事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }
    private void doWrite(SocketChannel sc) throws IOException {
        //要寫的內容
        byte[] req = "    -    QUERY TIME ORDER     -   ".getBytes();
        //為位元組緩衝區分配指定位元組大小的容量
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        //將內容寫入緩衝區
        writeBuffer.put(req);
        //反轉緩衝區
        writeBuffer.flip();
        //輸出列印緩衝區的可讀大小
        System.out.println(writeBuffer.remaining());
        //將內容寫入管道中
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            //若緩衝區中無可讀位元組,則說明成功傳送給伺服器訊息
            System.out.println("Send order 2 server succeed.");
        }
    }

}

程式看煩了吧!來看看這張圖幫助你理解程式

客戶端通訊序列圖:

這裡寫圖片描述



服務端執行結果:

這裡寫圖片描述



客戶端執行結果

這裡寫圖片描述

小結:

首先恭喜你看完了“這麼麻煩”的CS程式碼,和BIO相比,NIO的程式碼確實複雜了很多,但是不是就意味著我們必須要編寫這麼複雜的業務程式碼呢?答案是否定的,我感覺直接使用NIO程式設計,容易出錯,尤其是要讀寫轉換時要反轉緩衝區,那麼一個好的解決方法油然而生,那就是Netty,一個基於事件驅動的網路框架。