1. 程式人生 > >Java異步非阻塞IO NIO使用與代碼分析

Java異步非阻塞IO NIO使用與代碼分析

package mes 127.0.0.1 back 之一 write throwable private 建立

[TOC]


Java異步非阻塞IO NIO使用與代碼分析

TimeServer程序的NIO實現完整代碼

TimeServer程序來自書本《Netty權威指南》,nio的代碼確實有些難懂(這也是後面需要使用Netty的原因之一),不過我對代碼加了註釋,這樣一來對nio的概念及基本的使用都會有一個非常清晰的認識:

服務端程序

TimeServer.java:

package cn.xpleaf.nio;

public class TimeServer {
    public static void main(String[] args) {
        int port = 8080;
        if (args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (Exception e) {
                // 采用默認值
            }
        }
        new Thread(new MultiplexerTimeServer(port)).start();
    }
}

MultiplexerTimeServer.java:

package cn.xpleaf.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.sql.Date;
import java.util.Iterator;
import java.util.Set;

public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel servChannel;

    private volatile boolean stop;

    /**
     * 初始化多路復用器,綁定監聽端口
     */
    public MultiplexerTimeServer(int port) {
        try {
            // 創建多路復用器Selector
            selector = Selector.open();
            // 創建ServerSocketChannel,它相當於是所有客戶端連接的父管道
            servChannel = ServerSocketChannel.open();
            // 將ServerSocketChannel設置為異步非阻塞
            servChannel.configureBlocking(false);
            // 綁定偵聽端口,backlog為1024,表示serverchannel容納的最大的客戶端數量為1024(個人查找資料得出的結果,不一定準確)
            servChannel.socket().bind(new InetSocketAddress(port), 1024);
            // 將ServerSocketChannel註冊到selector上,並監聽SelectionKey.OP_ACCEPT操作位
            servChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port : " + port);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }

    }

    public void stop() {
        this.stop = true;
    }

    @Override
    public void run() {
        while (!stop) {
            try {
                // timeout - 如果為正,則在等待某個通道準備就緒時最多阻塞 timeout 毫秒;如果為零,則無限期地阻塞;必須為非負數(API文檔)
                // 休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次
                selector.select(1000);
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    // 獲取key值,通過對key進行操作,可以獲取到其所對應的註冊到selector上的channel
                    // 最初是只有一個ServerSocketChannel所對應的key,也就是前面所創建的servChannel,它相當於是所有客戶端連接的父管道
                    // nio的服務端就是通過它來創建與客戶端的連接的,因為目前的代碼就只有它監聽了SelectionKey.OP_ACCEPT操作位
                    key = it.next();
                    // 同時把該key值從selectedKeys集合中移除
                    it.remove();
                    // 處理該key值
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                // TODO Auto-generated catch block
                t.printStackTrace();
            }
        }

        // 多路復用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重復釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 對key進行處理
     * 
     * @param key
     * @throws IOException
     */
    public void handleInput(SelectionKey key) throws IOException {
        // 處理新接入的請求消息
        if (key.isValid()) {

            // 連接建立時
            if (key.isAcceptable()) {
                // 接收新的連接
                ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                SocketChannel sc = ssc.accept();
                // 設置SocketChannel為異步非阻塞
                sc.configureBlocking(false);
                // 註冊新的連接到多路復用器selector中,監聽SelectionKey.OP_READ操作位
                sc.register(selector, SelectionKey.OP_READ);
            }

            // 讀數據
            if (key.isReadable()) {
                // 通過key獲取到其註冊在Selector上的channel
                SocketChannel sc = (SocketChannel) key.channel();
                // 分配一個新的字節緩沖區,大小為1024KB,即1MB
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                // 由於前面已經將該SocketChannel設置為異步非阻塞模式,因此它的read是非阻塞的
                // 返回值為讀取到的字節數
                // 返回值不同,意義不同:
                /**
                 * 大小0:讀到了字節,對字節進行編解碼 等於0:沒有讀取到字節,屬於正常場景,忽略 為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源
                 */
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    // 讀取到字節,進行解碼操作

                    // 將緩沖區當前的limit設置為position,position設置為0,用於後續對緩沖區的讀取操作(我想這是API中定義的吧)
                    readBuffer.flip();
                    // 根據緩沖區可讀的字節個數創建字節數組
                    byte[] bytes = new byte[readBuffer.remaining()];
                    // 將緩沖區可讀的字節數組復制到新創建的字節數組中
                    readBuffer.get(bytes);
                    // 將字節數組以utf-8方式轉換為字符串
                    String body = new String(bytes, "utf-8");
                    System.out.println("The time server receive order : " + body);
                    // 解析客戶端發送的指令,同時構造返回結果
                    String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                            ? new Date(System.currentTimeMillis()).toString()
                            : "BAD ORDER";
                    // 將應答消息異步發送給客戶端
                    doWrite(sc, currentTime);
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else {
                    ; // 讀到0字節忽略
                }
            }
        }
    }

    /**
     * 將應答消息異步發送給客戶端
     * 
     * @param channel
     * @param response
     * @throws IOException
     */
    public void doWrite(SocketChannel channel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            // 將字符串編碼為字節數組
            byte[] bytes = response.getBytes();
            // 根據字節數組的容量創建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            // 將字節數組復制到緩沖區
            writeBuffer.put(bytes);
            // flip操作
            writeBuffer.flip();
            // 將緩沖區的字節數組發送出去
            channel.write(writeBuffer);
            /**
             * 註意這裏並沒有處理半包問題,《Netty權威指南》中的說明如下(P35)
             * 需要指出的是,由於SocketChannel是異步非阻塞的,它並不保證一次能夠把需要發送的字節數組發送完,此時會出現半包問題。
             * 我們需要註冊寫操作,不斷輪詢Selector將沒有發送完的ByteBuffer發送完畢,然後可以通過ByteBuffer的hasRemain()方法
             * 判斷消息是否發送完成。此處僅僅是個簡單的入門級例程,沒有演示如何處理“寫半包”場景,後續的章節會有詳細說明。
             */
        }
    }

}

客戶端程序

TimeClient.java:

package cn.xpleaf.nio;

public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if(args != null && args.length > 0) {
            try {
                port = Integer.valueOf(port);
            } catch (Exception e) {
                // 采用默認值
            }
        }
        new Thread(new TimeClientHandle("127.0.0.1", port)).start();
    }
}

TimeClientHandle.java:

package cn.xpleaf.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TimeClientHandle implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;

    /**
     * 初始化多路復用器,設置連接的服務端地址和端口
     * 
     * @param host
     * @param port
     */
    public TimeClientHandle(String host, int port) {
        this.host = host == null ? "127.0.0.1" : host;
        this.port = port;
        try {
            // 創建多路復用器Selector
            selector = Selector.open();
            // 創建SocketChannel,用來連接服務端
            socketChannel = SocketChannel.open();
            // 將SocketChannel設置為異步非阻塞
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
    }

    @Override
    public void run() {
        // 先嘗試直接連接
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        // 當然這裏也可以將上面的直接連接代碼註釋,然後使用下面這兩行代碼
        // 只是需要註意的是,如果一開始沒有嘗試連接,那麽即使後來註冊偵聽連接也是沒有意義的
        // 此時沒有發送連接請求,服務端根本就不會響應
        // socketChannel.connect(new InetSocketAddress(host, port));
        // socketChannel.register(selector, SelectionKey.OP_CONNECT);
        while (!stop) {
            try {
                // timeout - 如果為正,則在等待某個通道準備就緒時最多阻塞 timeout 毫秒;如果為零,則無限期地阻塞;必須為非負數(API文檔)
                // 休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次
                selector.select(1000);
                // 獲取所有就緒的channel的key
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    // 獲取key值,通過對key進行操作,可以獲取到其所對應的註冊到selector上的channel
                    // 最初是只有一個ServerSocketChannel所對應的key,也就是前面所創建的servChannel,它相當於是所有客戶端連接的父管道
                    // nio的服務端就是通過它來創建與客戶端的連接的,因為目前的代碼就只有它監聽了SelectionKey.OP_ACCEPT操作位
                    key = it.next();
                    // 同時把該key值從selectedKeys集合中移除
                    it.remove();
                    // 處理該key值
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        if (key != null) {
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        // 多路復用器關閉後,所有註冊在上面的Channel和Pipe等資源都會被自動去註冊並關閉,所以不需要重復釋放資源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 對key進行處理
     * 
     * @param key
     * @throws IOException
     */
    public void handleInput(SelectionKey key) throws IOException {
        if (key.isValid()) {

            // 通過key獲取到SocketChannel
            SocketChannel sc = (SocketChannel) key.channel();

            // isConnectable是判斷是否處於連接狀態
            // 如果是,說明服務端已經返回ACK應答消息,後面就需要對連接結果進行判斷
            if (key.isConnectable()) {
                // 對連接結果進行判斷
                if (sc.finishConnect()) {
                    // 註冊SocketChannel到多路復用器selector上,並監聽SelectionKey.OP_READ操作位
                    sc.register(selector, SelectionKey.OP_READ);
                    doWrite(sc);
                } else {
                    // 連接失敗,進程退出
                    System.exit(1);
                }
            }

            // 讀數據
            if (key.isReadable()) {
                // 分配一個新的字節緩沖區,大小為1024KB,即1MB
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                // 由於前面已經將該SocketChannel設置為異步非阻塞模式,因此它的read是非阻塞的
                // 返回值為讀取到的字節數
                // 返回值不同,意義不同:
                /**
                 * 大小0:讀到了字節,對字節進行編解碼 等於0:沒有讀取到字節,屬於正常場景,忽略 為-1:鏈路已經關閉,需要關閉SocketChannel,釋放資源
                 */
                int readBytes = sc.read(readBuffer);
                if (readBytes > 0) {
                    // 讀取到字節,進行解碼操作

                    // 將緩沖區當前的limit設置為position,position設置為0,用於後續對緩沖區的讀取操作(我想這是API中定義的吧)
                    readBuffer.flip();
                    // 根據緩沖區可讀的字節個數創建字節數組
                    byte[] bytes = new byte[readBuffer.remaining()];
                    // 將緩沖區可讀的字節數組復制到新創建的字節數組中
                    readBuffer.get(bytes);
                    // 將字節數組以utf-8方式轉換為字符串
                    String body = new String(bytes, "utf-8");
                    System.out.println("Now : " + body);
                } else if (readBytes < 0) {
                    // 對端鏈路關閉
                    key.cancel();
                    sc.close();
                } else {
                    ; // 讀到0字節忽略
                }
            }
        }
    }

    /**
     * 連接到服務端
     * 
     * @throws IOException
     */
    private void doConnect() throws IOException {
        // 如果直接連接成功,則註冊到多路復用器上,發送請求消息,讀應答
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel);
        } else {
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }

    /**
     * 寫操作
     * 
     * @throws IOException
     */
    private void doWrite(SocketChannel sc) throws IOException {
        // 將字符串編碼為字節數組
        byte[] req = "QUERY TIME ORDER".getBytes();
        // 根據字節數組的容量創建ByteBuffer
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
        // 將字節數組復制到緩沖區
        writeBuffer.put(req);
        // flip操作
        writeBuffer.flip();
        // 將緩沖區的字節數組發送出去
        sc.write(writeBuffer);
        if (!writeBuffer.hasRemaining()) {
            System.out.println("Send order 2 server succeesd.");
        }
        // 也是沒有處理"半包寫"的問題,可以查看服務端程序的代碼註釋說明
    }

}

程序測試

服務端執行:

The time server is start in port : 8080

客戶端執行:

Send order 2 server succeesd.
Now : 2018-02-10

此時再查看服務端的輸出結果:

The time server is start in port : 8080
The time server receive order : QUERY TIME ORDER

Java異步非阻塞IO NIO使用與代碼分析