1. 程式人生 > >Java NIO程式設計例項之三Selector

Java NIO程式設計例項之三Selector

Java NIO主要包含三個概念,即緩衝區(Buffer)、通道(Channel)和選擇器(Selector)。前面的文章已經介紹了緩衝區和通道,本文則講述最複雜的選擇器Selector。
本文是本系列的第三篇文章,關於緩衝區Buffer可以看第一篇:
https://zhuanlan.zhihu.com/p/25701512
關於通道Channel可以看第二篇:
https://zhuanlan.zhihu.com/p/25914350

1. Selector涉及的三個概念

在理解了Buffer和Channel之後,終於來到了最終的解決方案面前,那就是使用Selector來實現單執行緒控制多路非阻塞IO。Selector是如此重要,可以說它就是NIO非同步IO的核心控制器。Selector需要其他兩種物件配合使用,即SelectionKey和SelectableChannel,它們之間的關係如下圖所示:
這裡寫圖片描述


SelectableChannel是一類可以與Selector進行配合的通道,例如Socket相關通道以及Pipe產生的通道都屬於SelectableChannel。這類通道可以將自己感興趣的操作(例如read、write、accept和connect)註冊到一個Selector上,並在Selector的控制下進行IO相關操作。
Selector是一個控制器,它負責管理已註冊的多個SelectableChannel,當這些通道的某些狀態改變時,Selector會被喚醒(從select()方法的阻塞中),並對所有就緒的通道進行輪詢操作。
SelectionKey是一個用來記錄SelectableChannel和Selector之間關係的物件,它由SelectableChannel的register()方法返回,並存儲在Selector的多個集合中。它不僅記錄了兩個物件的引用,還包含了SelectableChannel感興趣的操作,即OP_READ,OP_WRITE,OP_ACCEPT和OP_CONNECT。

1.1 register方法

在展示例子程式碼之前,必須對一些概念和操作進行簡要的介紹。首先是SelectableChannel的register方法,它的正式定義為:

SelectionKey register(Selector sel, int ops)

第一個引數指明要註冊的Selector,第二個引數指明本通道感興趣的操作,此引數的取值可以是SelectionKey.OP_ACCEPT等四個,以及它們的邏輯值,例如SelectionKey.OP_READ & SelectionKey.OP_WRITE。方法的返回值是一個SelectionKey,這個物件會被自動加入Selector的keys集合,因此不必特意保留這個SelectionKey的物件引用,需要時可以使用Selector的keys()方法得到所有的SelectionKey物件引用。
註冊完成後,該通道就與Selector保持關聯了。當通道的狀態改變時,其改變會自動被Selector感知,並在Selector的三個集合中反應出來。

1.2 Selector的三個集合

如上圖所示,Selector物件會維持三個SelectionKey集合,分別是keys集合,儲存了所有與Selector關聯的SelectionKey物件;selectedKeys集合,儲存了在一次select()方法呼叫後,所有狀態改變的通道關聯的SelectionKey物件;cancelledKeys集合,儲存了一輪select()方法呼叫過程中,所有被取消但還未從keys中刪除的SelectionKey物件。
其中最值得關注的是selectedKeys集合,它使用Selector物件的selectedKeys()方法獲得,並通常會進行輪詢處理。

1.3 select方法

Selector類的select()方法是一個阻塞方法,它有兩種形式:
int select()
int select(long timeout)
不帶引數的方法會一直阻塞,直到至少有一個註冊的通道狀態改變,才會被喚醒;帶有timeout引數的方法會一直阻塞,直到時間耗盡,或者有通道的狀態改變。

1.4 輪詢處理

在一次select()方法返回後,應對selectedKeys集合中的所有SelectionKey物件進行輪詢操作,並在操作完成後手動將SelectionKey物件從selectedKeys集合中刪除。

2. Selector程式碼例項

在展示具體的程式碼之前,先畫一個從《Netty In Action》書上抄來的圖:
這裡寫圖片描述
服務端程式碼:

public class SelectorServer {
    private static final int PORT = 1234;
    private static ByteBuffer buffer = ByteBuffer.allocate(1024);

    public static void main(String[] args) {
        try {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(PORT));
            ssc.configureBlocking(false);
            //1.register()
            Selector selector = Selector.open();
            ssc.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());

            while (true) {
                //2.select()
                int n = selector.select();
                if (n == 0) {
                    continue;
                }
                //3.輪詢SelectionKey
                Iterator<SelectionKey> iterator = (Iterator) selector.selectedKeys().iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    //如果滿足Acceptable條件,則必定是一個ServerSocketChannel
                    if (key.isAcceptable()) {
                        ServerSocketChannel sscTemp = (ServerSocketChannel) key.channel();
                        //得到一個連線好的SocketChannel,並把它註冊到Selector上,興趣操作為READ
                        SocketChannel socketChannel = sscTemp.accept();
                        socketChannel.configureBlocking(false);
                        socketChannel.register(selector, SelectionKey.OP_READ);
                        System.out.println("REGISTER CHANNEL , CHANNEL NUMBER IS:" + selector.keys().size());
                    }
                    //如果滿足Readable條件,則必定是一個SocketChannel
                    if (key.isReadable()) {
                        //讀取通道中的資料
                        SocketChannel channel = (SocketChannel) key.channel();
                        readFromChannel(channel);
                    }
                    //4.remove SelectionKey
                    iterator.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void readFromChannel(SocketChannel channel) {
        buffer.clear();
        try {
            while (channel.read(buffer) > 0) {
                buffer.flip();
                byte[] bytes = new byte[buffer.remaining()];
                buffer.get(bytes);
                System.out.println("READ FROM CLIENT:" + new String(bytes));
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

首先註冊了一個ServerSocketChannel,它用來監聽1234埠上的連線;當監聽到連線時,把連線上的SocketChannel再註冊到Selector上,這些SocketChannel註冊的是SelectionKey.OP_READ事件;當這些SocketChannel狀態變為可讀時,讀取資料並顯示。
客戶端程式碼:

public class SelectorClient {
    static class Client extends Thread {
        private String name;
        private Random random = new Random(47);

        Client(String name) {
            this.name = name;
        }

        @Override
        public void run() {
            try {
                SocketChannel channel = SocketChannel.open();
                channel.configureBlocking(false);
                channel.connect(new InetSocketAddress(1234));
                while (!channel.finishConnect()) {
                    TimeUnit.MILLISECONDS.sleep(100);
                }
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                for (int i = 0; i < 5; i++) {
                    TimeUnit.MILLISECONDS.sleep(100 * random.nextInt(10));
                    String str = "Message from " + name + ", number:" + i;
                    buffer.put(str.getBytes());
                    buffer.flip();
                    while (buffer.hasRemaining()) {
                        channel.write(buffer);
                    }
                    buffer.clear();
                }
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.submit(new Client("Client-1"));
        executorService.submit(new Client("Client-2"));
        executorService.submit(new Client("Client-3"));
        executorService.shutdown();
    }
}

客戶端建立了三個執行緒,每個執行緒建立一個SocketChannel通道,並連線到伺服器,並向伺服器傳送5條訊息。

3. 小結

Selector是Java NIO的核心概念,以至於一些人直接將NIO稱之為Selector-based IO。要學會Selector的使用首先是要明白其相關的多個概念,並多多動手去寫。
至此《Java NIO程式設計例項》系列的三篇就寫完了,接下來應該好好介紹一下Netty了,畢竟它才是在具體的Java服務端程式設計用得最多的框架。Netty克服了NIO中一些概念和設計上的不足之處,提供了更加優雅的解決方案。但是,要學好用好Netty,學習NIO是必經之路,有了NIO的基礎,才能真正學好Netty。