1. 程式人生 > >NIO學習之Selector,SelectionKey與客戶端與服務端通訊簡單實現(1)

NIO學習之Selector,SelectionKey與客戶端與服務端通訊簡單實現(1)

選擇器(Selector)的 作用:將通道感興趣的事件放入佇列中,而不是馬上提交給應用程式,等已註冊的通道自己來請求處理這些事件。換句話說,就是選擇器將會隨時報告已經準備好了的通道,而且是按照先進先出的順序。

Selector類定義如下:
這裡寫圖片描述

open()方法,靜態方法,用於獲取1個Selector物件

keys()方法,用於獲取所有註冊到Selector物件上的SelectionKey

selectedKeys()方法,用於獲取已註冊通道發出的準備好了的SelectionKey

select()方法,呼叫此方法的執行緒會一直阻塞,直到註冊的通道中關注的事件發生了才會返回。 例:服務端的ServerSocketChannel

註冊了OP_ACCEPT事件,如果沒有客戶端連線服務端,那麼服務端將一直阻塞在呼叫select()方法的地方

select(long)方法,此方法與不帶引數的差不多,引數的含義是當阻塞的時間超過給定的時間引數,該方法將返回繼續執行

wakeup()方法,會喚醒當前被阻塞的執行緒,使其 select() 立即返回;如果當前執行緒沒有阻塞,那麼執行了wakeUp() 方法之後,下一個執行緒的 select() 方法會被立即返回,不再被阻塞下去

close()方法,能夠關閉當前的選擇器。當一個執行緒當前呈阻塞狀態,那麼中止這種狀態需要先呼叫Selector的 wakeUp() 方法。close()方法在實現類中先呼叫wakeUp()

方法喚醒被阻塞的執行緒,然後置空所有的通道、所有就緒的SelectionKey,讓這個選擇器上的輪詢元件也閒置下來。

選擇鍵(SelectionKey)的作用是表明哪個通道已經做好了相應的準備,相應的準備指SelectionKey類裡定義的OP_READ,OP_WRITE,OP_CONNECT,OP_ACCEPT中某個某個事件發生了。Selector不斷輪詢是否有事件準備好了,如果有事件準備好了則獲取事件相應的SelectionKey,進入事件處理

SelectionKey定義如下:
這裡寫圖片描述

SelectionKey主要說兩點:
attach(Object)主要用於繫結一個物件(物件可以任意型別
),繫結後可以在不同執行緒間共享,對應的attachment()方法用於獲取繫結的物件

interestOps()方法用於獲取通道“感興趣”的操作或事件

說了SelectorSelectionKey 的基礎知識及主要方法的使用,現在就來實踐一下。

服務端
虛擬碼

1.開啟1個ServerSocketChannel通道
2.把ServerSocketChannel設定為非阻塞的
3.為ServerSocketChannel通道繫結ip地址與埠
4.獲取Selector物件
5.把ServerSocketChannel通道感興趣的事件註冊到Selector物件裡
6.監聽埠,等待已註冊的通道關注的事件觸發
7.獲取事件觸發的通道進行相應的處理
public class NIOServer {
    private Selector selector;

    public void initServer(int port) throws IOException{
        // 開啟ServerSocket通道
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        // 獲取一個選擇器
        this.selector = Selector.open();

        // 將通道管理器與該通道進行繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件
        // 註冊事件後,當該事件觸發時會使selector.select()返回,
        // 否則selector.select()一直阻塞
        serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);

    }

    public void listen() throws IOException{
        System.out.println("啟動伺服器!");
        while (true) {
            // select()方法一直阻塞直到有註冊的通道準備好了才會返回
            selector.select();
            Iterator<?> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = (SelectionKey) iterator.next();
                // 刪除已選的key,防止重複處理
                iterator.remove();
                handler(key);
            }
        }
    }

    public void handler(SelectionKey key)throws IOException{
        if (key.isAcceptable()) {
            handlerAccept(key);
        }else if (key.isReadable()){
            handlerRead(key);
        }else if (key.isWritable()){
            System.out.println("can write!");
        }else if (key.isConnectable()){
            System.out.println("is connectable");
        }
    }


    public void handlerAccept(SelectionKey key) throws IOException{
        // 從SelectionKey中獲取ServerSocketChannel
        ServerSocketChannel server = (ServerSocketChannel) key.channel(); 

        // 獲取SocketChannel
        SocketChannel socketChannel = server.accept();
        // 設定成非阻塞
        socketChannel.configureBlocking(false);
        System.out.println("與客戶端建立連線");

        // 為socketChannel通道建立 OP_READ 讀操作,使客戶端傳送的內容可以被讀到

        socketChannel.register(selector, SelectionKey.OP_READ);  


 // 往客戶端傳送傳送資訊   
      socketChannel.write(ByteBuffer.wrap("connected\n".getBytes()));
    }

    public void handlerRead(SelectionKey key)throws IOException{
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 建立讀取緩衝區
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        // 從通道讀取可讀取的位元組數
        try {
            int readcount = socketChannel.read(byteBuffer);
            if (readcount > 0) {
                byte[] data = byteBuffer.array();
                String msg = new String(data);
                System.out.println("服務端收到的資訊為:\n" + msg);
                ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
                socketChannel.write(outBuffer);
            } else {
                System.out.println("客戶端異常退出");

            }
        } catch (IOException e) {
            System.out.println("異常資訊:\n" + e.getMessage());
            key.cancel();
        }



    }

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.initServer(8888);
        server.listen();
    }
}

客戶端

public class NIOClient1 {

    private Selector selector;
    private SocketChannel socketChannel;
    private SocketAddress address;

    public NIOClient1(String host, int port) throws IOException {
        address = new InetSocketAddress(host, port);
        selector = Selector.open();
        socketChannel = SocketChannel.open(address);
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);

    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);
        System.out.print("輸入伺服器ip地址:\t");
        String host = scanner.next();
        System.out.print("輸入埠號:\t");
        int port = scanner.nextInt();
        try {
            NIOClient client = new NIOClient(host, port);


            Selector selector = client.getSelector();
            String msg = scanner.next();
            while (true) {
                if (msg.equals("q")) {
                    break;
                }
                client.getSocketChannel().write(ByteBuffer.wrap(msg.getBytes()));

                if (selector.select() > 0) {
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        if (key.isReadable()) {
                            SocketChannel socketChannel = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            int size = socketChannel.read(buffer);
                            if (size > 0) {
                                buffer.flip();
                                byte[] data = new byte[size];
                                buffer.get(data);
                                System.out.println("來自服務端的訊息:\t" + new String(data));
                            }

                        }
                        iterator.remove();
                    }

                }

                msg = scanner.next();
            }


        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Selector getSelector() {
        return selector;
    }

    public SocketChannel getSocketChannel() {
        return socketChannel;
    }

    public SocketAddress getAddress() {
        return address;
    }


    public void setSelector(Selector selector) {
        this.selector = selector;
    }

    public void setSocketChannel(SocketChannel socketChannel) {
        this.socketChannel = socketChannel;
    }

    public void setAddress(SocketAddress address) {
        this.address = address;
    }
}

執行效果
1. 先啟動服務端
2. 啟動客戶端


這裡寫圖片描述

參考部落格