1. 程式人生 > >通俗程式設計——白話NIO之Selector

通俗程式設計——白話NIO之Selector

1. Selector簡介

選擇器提供選擇執行已經就緒的任務的能力.從底層來看,Selector提供了詢問通道是否已經準備好執行每個I/O操作的能力。Selector 允許單執行緒處理多個Channel。僅用單個執行緒來處理多個Channels的好處是,只需要更少的執行緒來處理通道。事實上,可以只用一個執行緒處理所有的通道,這樣會大量的減少執行緒之間上下文切換的開銷。

在開始之前,需要回顧一下Selector、SelectableChannel和SelectionKey:

選擇器(Selector)

Selector選擇器類管理著一個被註冊的通道集合的資訊和它們的就緒狀態。通道是和選擇器一起被註冊的,並且使用選擇器來更新通道的就緒狀態。當這麼做的時候,可以選擇將被激發的執行緒掛起,直到有就緒的的通道。

可選擇通道(SelectableChannel)

SelectableChannel這個抽象類提供了實現通道的可選擇性所需要的公共方法。它是所有支援就緒檢查的通道類的父類。因為FileChannel類沒有繼承SelectableChannel因此是不是可選通道,而所有socket通道都是可選擇的,包括從管道(Pipe)物件的中獲得的通道。SelectableChannel可以被註冊到Selector物件上,同時可以指定對那個選擇器而言,那種操作是感興趣的。一個通道可以被註冊到多個選擇器上,但對每個選擇器而言只能被註冊一次。

選擇鍵(SelectionKey)

選擇鍵封裝了特定的通道與特定的選擇器的註冊關係。選擇鍵物件被SelectableChannel.register()返回並提供一個表示這種註冊關係的標記。選擇鍵包含了兩個位元集(以整數的形式進行編碼),指示了該註冊關係所關心的通道操作,以及通道已經準備好的操作。

下面是使用Selector管理多個channel的結構圖:
這裡寫圖片描述

2. Selector的使用

1 建立Selector

Selector物件是通過呼叫靜態工廠方法open()來例項化的,如下:

Selector Selector=Selector.open();

類方法open()實際上向SPI1發出請求,通過預設的SelectorProvider物件獲取一個新的例項。

2 將Channel註冊到Selector

要實現Selector管理Channel,需要將channel註冊到相應的Selector上,如下:

channel.configureBlocking(false
); SelectionKey key= channel.register(selector,SelectionKey,OP_READ);

通過呼叫通道的register()方法會將它註冊到一個選擇器上。與Selector一起使用時,Channel必須處於非阻塞模式下,否則將丟擲IllegalBlockingModeException異常,這意味著不能將FileChannel與Selector一起使用,因為FileChannel不能切換到非阻塞模式,而套接字通道都可以。另外通道一旦被註冊,將不能再回到阻塞狀態,此時若呼叫通道的configureBlocking(true)將丟擲BlockingModeException異常。

register()方法的第二個引數是“interest集合”,表示選擇器所關心的通道操作,它實際上是一個表示選擇器在檢查通道就緒狀態時需要關心的操作的位元掩碼。比如一個選擇器對通道的read和write操作感興趣,那麼選擇器在檢查該通道時,只會檢查通道的read和write操作是否已經處在就緒狀態。
它有以下四種操作型別:

  • Connect 連線
  • Accept 接受
  • Read 讀
  • Write 寫

需要注意並非所有的操作在所有的可選擇通道上都能被支援,比如ServerSocketChannel支援Accept,而SocketChannel中不支援。我們可以通過通道上的validOps()方法來獲取特定通道下所有支援的操作集合。

JAVA中定義了四個常量來表示這四種操作型別:

SelectionKey.OP_CONNECT
SelectionKey.OP_ACCEPT
SelectionKey.OP_READ
SelectionKey.OP_WRITE

如果Selector對通道的多操作型別感興趣,可以用“位或”操作符來實現:int interestSet=SelectionKey.OP_READ|SelectionKey.OP_WRITE;
當通道觸發了某個操作之後,表示該通道的某個操作已經就緒,可以被操作。因此,某個SocketChannel成功連線到另一個伺服器稱為“連線就緒”(OP_CONNECT)。一個ServerSocketChannel準備好接收新進入的連線稱為“接收就緒”(OP_ACCEPT)。一個有資料可讀的通道可以說是“讀就緒”(OP_READ)。等待寫資料的通道可以說是“寫就緒”(OP_WRITE)。

我們注意到register()方法會返回一個SelectionKey物件,我們稱之為鍵物件。該物件包含了以下四種屬性:

  • interest集合
  • read集合
  • Channel
  • Selector

interest集合是Selector感興趣的集合,用於指示選擇器對通道關心的操作,可通過SelectionKey物件的interestOps()獲取。最初,該興趣集合是通道被註冊到Selector時傳進來的值。該集合不會被選擇器改變,但是可通過interestOps()改變。我們可以通過以下方法來判斷Selector是否對Channel的某種事件感興趣:

   int interestSet=selectionKey.interestOps();
   boolean isInterestedInAccept  = (interestSet & SelectionKey.OP_ACCEPT) == SelectionKey.OP_ACCEPT;

read集合是通道已經就緒的操作的集合,表示一個通道準備好要執行的操作了,可通過SelctionKey物件的readyOps()來獲取相關通道已經就緒的操作。它是interest集合的子集,並且表示了interest集合中從上次呼叫select()以後已經就緒的那些操作。(比如選擇器對通道的read,write操作感興趣,而某時刻通道的read操作已經準備就緒可以被選擇器獲知了,前一種就是interest集合,後一種則是read集合。)。JAVA中定義以下幾個方法用來檢查這些操作是否就緒:

    //int readSet=selectionKey.readOps();
    selectionKey.isAcceptable();//等價於selectionKey.readyOps()&SelectionKey.OP_ACCEPT
    selectionKey.isConnectable();
    selectionKey.isReadable();
    selectionKey.isWritable();

需要注意的是,通過相關的選擇鍵的readyOps()方法返回的就緒狀態指示只是一個提示,底層的通道在任何時候都會不斷改變,而其他執行緒也可能在通道上執行操作並影響到它的就緒狀態。另外,我們不能直接修改read集合。

取出SelectionKey所關聯的Selector和Channel
通過SelectionKey訪問對應的Selector和Channel:

Channel channel =selectionKey.channel();
Selector selector=selectionKey.selector();

關於取消SelectionKey物件的那點事
我們可以通過SelectionKey物件的cancel()方法來取消特定的註冊關係。該方法呼叫之後,該SelectionKey物件將會被”拷貝”至已取消鍵的集合中,該鍵此時已經失效,但是該註冊關係並不會立刻終結。在下一次select()時,已取消鍵的集合中的元素會被清除,相應的註冊關係也真正終結。

3 為SelectionKey繫結附加物件

可以將一個或者多個附加物件繫結到SelectionKey上,以便容易的識別給定的通道。通常有兩種方式:
1. 在註冊的時候直接繫結:
SelectionKey key=channel.register(selector,SelectionKey.OP_READ,theObject);
2. 在繫結完成之後附加:
selectionKey.attach(theObject);//繫結

繫結之後,可通過對應的SelectionKey取出該物件:
selectionKey.attachment();
如果要取消該物件,則可以通過該種方式:
selectionKey.attach(null).

需要注意的是如果附加的物件不再使用,一定要人為清除,因為垃圾回收器不會回收該物件,若不清除的話會成記憶體洩漏。

一個單獨的通道可被註冊到多個選擇器中,有些時候我們需要通過isRegistered()方法來檢查一個通道是否已經被註冊到任何一個選擇器上。 通常來說,我們並不會這麼做。

4 通過Selector選擇通道

我們知道選擇器維護註冊過的通道的集合,並且這種註冊關係都被封裝在SelectionKey當中。接下來我們簡單的瞭解一下Selector維護的三種類型SelectionKey集合:

已註冊的鍵的集合(Registered key set)

所有與選擇器關聯的通道所生成的鍵的集合稱為已經註冊的鍵的集合。並不是所有註冊過的鍵都仍然有效。這個集合通過keys()方法返回,並且可能是空的。這個已註冊的鍵的集合不是可以直接修改的;試圖這麼做的話將引發java.lang.UnsupportedOperationException。

已選擇的鍵的集合(Selected key set)

已註冊的鍵的集合的子集。這個集合的每個成員都是相關的通道被選擇器(在前一個選擇操作中)判斷為已經準備好的,並且包含於鍵的interest集合中的操作。這個集合通過selectedKeys()方法返回(並有可能是空的)。
不要將已選擇的鍵的集合與ready集合弄混了。這是一個鍵的集合,每個鍵都關聯一個已經準備好至少一種操作的通道。每個鍵都有一個內嵌的ready集合,指示了所關聯的通道已經準備好的操作。鍵可以直接從這個集合中移除,但不能新增。試圖向已選擇的鍵的集合中新增元素將丟擲java.lang.UnsupportedOperationException。

已取消的鍵的集合(Cancelled key set)

已註冊的鍵的集合的子集,這個集合包含了cancel()方法被呼叫過的鍵(這個鍵已經被無效化),但它們還沒有被登出。這個集合是選擇器物件的私有成員,因而無法直接訪問。

在剛初始化的Selector物件中,這三個集合都是空的。通過Selector的select()方法可以選擇已經準備就緒的通道(這些通道包含你感興趣的的事件)。比如你對讀就緒的通道感興趣,那麼select()方法就會返回讀事件已經就緒的那些通道。下面是Selector幾個過載的select()方法:
select():阻塞到至少有一個通道在你註冊的事件上就緒了。
select(long timeout):和select()一樣,但最長阻塞事件為timeout毫秒。
selectNow():非阻塞,只要有通道就緒就立刻返回。

select()方法返回的int值表示有多少通道已經就緒,是自上次呼叫select()方法後有多少通道變成就緒狀態。之前在select()呼叫時進入就緒的通道不會在本次呼叫中被記入,而在前一次select()呼叫進入就緒但現在已經不在處於就緒的通道也不會被記入。例如:首次呼叫select()方法,如果有一個通道變成就緒狀態,返回了1,若再次呼叫select()方法,如果另一個通道就緒了,它會再次返回1。如果對第一個就緒的channel沒有做任何操作,現在就有兩個就緒的通道,但在每次select()方法呼叫之間,只有一個通道就緒了。

一旦呼叫select()方法,並且返回值不為0時,則可以通過呼叫Selector的selectedKeys()方法來訪問已選擇鍵集合。如下:
Set selectedKeys=selector.selectedKeys();
進而可以放到和某SelectionKey關聯的Selector和Channel。如下所示:

Set selectedKeys = selector.selectedKeys();
Iterator keyIterator = selectedKeys.iterator();
while(keyIterator.hasNext()) {
    SelectionKey key = keyIterator.next();
    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.
    } else if (key.isConnectable()) {
        // a connection was established with a remote server.
    } else if (key.isReadable()) {
        // a channel is ready for reading
    } else if (key.isWritable()) {
        // a channel is ready for writing
    }
    keyIterator.remove();
}

關於Selector執行選擇的過程

我們知道呼叫select()方法進行通道,現在我們再來深入一下選擇的過程,也就是select()執行過程。當select()被呼叫時將執行以下幾步:

  1. 首先檢查已取消鍵集合,也就是通過cancle()取消的鍵。如果該集合不為空,則清空該集合裡的鍵,同時該集合中每個取消的鍵也將從已註冊鍵集合和已選擇鍵集合中移除。(一個鍵被取消時,並不會立刻從集合中移除,而是將該鍵“拷貝”至已取消鍵集合中,這種取消策略就是我們常提到的“延遲取消”。)
  2. 再次檢查已註冊鍵集合(準確說是該集合中每個鍵的interest集合)。系統底層會依次詢問每個已經註冊的通道是否準備好選擇器所感興趣的某種操作,一旦發現某個通道已經就緒了,則會首先判斷該通道是否已經存在在已選擇鍵集合當中,如果已經存在,則更新該通道在已註冊鍵集合中對應的鍵的ready集合,如果不存在,則首先清空該通道的對應的鍵的ready集合,然後重設ready集合,最後將該鍵存至已註冊鍵集合中。這裡需要明白,當更新ready集合時,在上次select()中已經就緒的操作不會被刪除,也就是ready集合中的元素是累積的,比如在第一次的selector對某個通道的read和write操作感興趣,在第一次執行select()時,該通道的read操作就緒,此時該通道對應的鍵中的ready集合存有read元素,在第二次執行select()時,該通道的write操作也就緒了,此時該通道對應的ready集合中將同時有read和write元素。

深入已註冊鍵集合的管理

到現在我們已經知道一個通道的的鍵是如何被新增到已選擇鍵集合中的,下面我們來繼續瞭解對已選擇鍵集合的管理 。首先要記住:選擇器不會主動刪除被新增到已選擇鍵集合中的鍵,而且被新增到已選擇鍵集合中的鍵的ready集合只能被設定,而不能被清理。如果我們希望清空已選擇鍵集合中某個鍵的ready集合該怎麼辦?我們知道一個鍵在新加入已選擇鍵集合之前會首先置空該鍵的ready集合,這樣的話我們可以人為的將某個鍵從已註冊鍵集合中移除最終實現置空某個鍵的ready集合。被移除的鍵如果在下一次的select()中再次就緒,它將會重新被新增到已選擇的鍵的集合中。這就是為什麼要在每次迭代的末尾呼叫keyIterator.remove()

5 停止選擇

選擇器執行選擇的過程,系統底層會依次詢問每個通道是否已經就緒,這個過程可能會造成呼叫執行緒進入阻塞狀態,那麼我們有以下三種方式可以喚醒在select()方法中阻塞的執行緒。

  1. 通過呼叫Selector物件的wakeup()方法讓處在阻塞狀態的select()方法立刻返回
    該方法使得選擇器上的第一個還沒有返回的選擇操作立即返回。如果當前沒有進行中的選擇操作,那麼下一次對select()方法的一次呼叫將立即返回。

  2. 通過close()方法關閉Selector**
    該方法使得任何一個在選擇操作中阻塞的執行緒都被喚醒(類似wakeup()),同時使得註冊到該Selector的所有Channel被登出,所有的鍵將被取消,但是Channel本身並不會關閉。

  3. 呼叫interrupt()
    呼叫該方法會使睡眠的執行緒丟擲InterruptException異常,捕獲該異常並在呼叫wakeup()

上面有些人看到“系統底層會依次詢問每個通道”時可能在想如果已選擇鍵非常多是,會不會耗時較長?答案是肯定的。但是我想說的是通常你可以選擇忽略該過程,至於為什麼,後面再說。

3. Selector完整例項

這裡我們結合ServerSocketChannel和Selector構建簡單的伺服器,下面是完整的程式碼示例。
服務端程式碼:

public class ServerSocketChannelTest {

    private int size = 1024;
    private ServerSocketChannel socketChannel;
    private ByteBuffer byteBuffer;
    private Selector selector;
    private final int port = 8998;
    private int remoteClientNum=0;

    public ServerSocketChannelTest() {
        try {
            initChannel();
        } catch (Exception e) {
            e.printStackTrace();
            System.exit(-1);
        }
    }

    public void initChannel() throws Exception {
        socketChannel = ServerSocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.bind(new InetSocketAddress(port));
        System.out.println("listener on port:" + port);
        selector = Selector.open();
        socketChannel.register(selector, SelectionKey.OP_ACCEPT);
        byteBuffer = ByteBuffer.allocateDirect(size);
        byteBuffer.order(ByteOrder.BIG_ENDIAN);
    }

    private void listener() throws Exception {
        while (true) {
            int n = selector.select();
            if (n == 0) {
                continue;
            }
            Iterator<SelectionKey> ite = selector.selectedKeys().iterator();
            while (ite.hasNext()) {
                SelectionKey key = ite.next();
                //a connection was accepted by a ServerSocketChannel.
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    registerChannel(selector, channel, SelectionKey.OP_READ);
                    remoteClientNum++;
                    System.out.println("online client num="+remoteClientNum);
                    replyClient(channel);
                }
                //a channel is ready for reading
                if (key.isReadable()) {
                    readDataFromSocket(key);
                }

                ite.remove();//must
            }

        }
    }

    protected void readDataFromSocket(SelectionKey key) throws Exception {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count;
        byteBuffer.clear();
        while ((count = socketChannel.read(byteBuffer)) > 0) {
            byteBuffer.flip(); // Make buffer readable
            // Send the data; don't assume it goes all at once
            while (byteBuffer.hasRemaining()) {
                socketChannel.write(byteBuffer);
            }
            byteBuffer.clear(); // Empty buffer
        }
        if (count < 0) {
            socketChannel.close();
        }
    }

    private void replyClient(SocketChannel channel) throws IOException {
        byteBuffer.clear();
        byteBuffer.put("hello client!\r\n".getBytes());
        byteBuffer.flip();
        channel.write(byteBuffer);
    }

    private void registerChannel(Selector selector, SocketChannel channel, int ops) throws Exception {
        if (channel == null) {
            return;
        }
        channel.configureBlocking(false);
        channel.register(selector, ops);
    }


    public static void main(String[] args) {
        try {
            new ServerSocketChannelTest().listener();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

客戶端程式碼:

public class SocketChannelTest {

    private int size = 1024;
    private ByteBuffer byteBuffer;
    private SocketChannel socketChannel;

    public void connectServer() throws IOException {
        socketChannel = SocketChannel.open();
        socketChannel.connect(new InetSocketAddress("127.0.0.1", 8998));
        byteBuffer = ByteBuffer.allocate(size);
        byteBuffer.order(ByteOrder.BIG_ENDIAN);
        receive();
    }

    private void receive() throws IOException {
        while (true) {
            int count;
            byteBuffer.clear();
            while ((count = socketChannel.read(byteBuffer)) > 0) {
                byteBuffer.flip();
                while (byteBuffer.hasRemaining()) {
                    System.out.print((char) byteBuffer.get());
                }
                //send("send data to server\r\n".getBytes());
                byteBuffer.clear();
            }
        }
    }

    private void send(byte[] data) throws IOException {
        byteBuffer.clear();
        byteBuffer.put(data);
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
    }

    public static void main(String[] args) throws IOException {
        new SocketChannelTest().connectServer();
    }
}
  1. 是Serial Peripheral Interface的縮寫,代表序列外圍裝置介面。