1. 程式人生 > >NIO解讀之多路複用器Selector

NIO解讀之多路複用器Selector

Selector類的結構圖如下所示:



Selector是JDK的NIO中最重要的類之一,當我們通過Selector.open()方法開啟一個多路複用器的時候實際上執行的open方法為

public static Selector open() throws IOException {

    return SelectorProvider.provider().openSelector();

}

呼叫了SelectorProvider.provider()方法,方法程式碼如下:

    public static SelectorProvider provider() {

        synchronized (lock) {

            if (provider != null)

                return provider;

            return AccessController.doPrivileged(

                new PrivilegedAction<SelectorProvider>() {

                    public SelectorProvider run() {

                            if (loadProviderFromProperty())

                                return provider;

                            if (loadProviderAsService())

                                return provider;

                            provider = sun.nio.ch.DefaultSelectorProvider.create();

                            return provider;

                        }

                    });

        }

    }

可以看出這個方法首先通過加鎖保證了靜態的屬性provider 如果不為空,才去建立一個,這個方法對於不同的作業系統平臺會返回不同的例項,對於Windows返回的就是WindowsSelectorProvider這個Provider

然後呼叫了WindowsSelectorProvider的openSelector方法建立了WindowsSelectorImpl也就是我們真正的多路複用器實現類,接著WindowsSelectorImpl的構造方法:

    WindowsSelectorImpl(SelectorProvider sp) throws IOException {

        super(sp);      

        pollWrapper = new PollArrayWrapper(INIT_CAP);

        wakeupPipe = Pipe.open();

        wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

        // Disable the Nagle algorithm so that the wakeup is more immediate

        SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();

        (sink.sc).socket().setTcpNoDelay(true);

        wakeupSinkFd = ((SelChImpl)sink).getFDVal();

        pollWrapper.addWakeupSocket(wakeupSourceFd, 0);

    }

首先通過 super(sp);  呼叫了父類的SelectorImpl的構造方法將SelectorProvider的實現類WindowsSelectorProvider傳入,SelectorImpl的構造方法如下:

    protected SelectorImpl(SelectorProvider sp) {

        super(sp);

        keys = new HashSet<SelectionKey>();

        selectedKeys = new HashSet<SelectionKey>();

        if (Util.atBugLevel("1.4")) {

            publicKeys = keys;

            publicSelectedKeys = selectedKeys;

        } else {

            publicKeys = Collections.unmodifiableSet(keys);

            publicSelectedKeys = Util.ungrowableSet(selectedKeys);

        }

    }

他的第一行程式碼也呼叫了父類AbstractSelector的構造方法:程式碼如下:

    protected AbstractSelector(SelectorProvider provider) {

        this.provider = provider;

    }

他只是將SelectorProvider 的實現類儲存到了自己的例項變數中,可見AbstractSelector類中儲存了SelectorProvider 的實現,接著看SelectorImpl構造方法的其他程式碼:

他構造了keys和selectedKeys變數。接著看WindowsSelectorImpl的構造方法:

接著構造了pollWrapper變數,接著看wakeupPipe = Pipe.open();這是一行關鍵的程式碼,看看Pipe.open()方法,程式碼如下:

    public static Pipe open() throws IOException {

        return SelectorProvider.provider().openPipe();

    }

他同樣也是通過SelectorProvider.provider()建立的上文說過這個物件的建立,這個方法定義在SelectorProviderImpl這個abstract類中,同樣幾個其他的DatagramChannel,ServerSocketChannel,SocketChannel都是定義在SelectorProviderImpl這個的類中,看上文的繼承結構圖可以看出這個類是WindowsSelectorProvider的父類,實際上WindowsSelectorProvider只是實現了抽象類SelectorProviderImpl中的一個WindowsSelectorProvider方法而已,SelectorProviderImpl的openPipe方法如下:

    public Pipe openPipe() throws IOException {

        return new PipeImpl(this);

    }

接著看PipeImpl的構造方法,Pipe類的繼承結構如下所示:

PipeImpl類的構造方法如下:

    PipeImpl(SelectorProvider sp) {

        long pipeFds = IOUtil.makePipe(true);

        int readFd = (int) (pipeFds >>> 32);

        int writeFd = (int) pipeFds;

        FileDescriptor sourcefd = new FileDescriptor();

        IOUtil.setfdVal(sourcefd, readFd);

        source = new SourceChannelImpl(sp, sourcefd);

        FileDescriptor sinkfd = new FileDescriptor();

        IOUtil.setfdVal(sinkfd, writeFd);

        sink = new SinkChannelImpl(sp, sinkfd);

    }

首先呼叫IOUtil.makePipe這個native方法,通過他的註釋我們可以看出他建立了一個pipe管道的兩個檔案描述符物件,read端是返回值的高32位,write端是返回值的低32位,

    /**

     * Returns two file descriptors for a pipe encoded in a long.

     * The read end of the pipe is returned in the high 32 bits,

     * while the write end is returned in the low 32 bits.

     */

    static native long makePipe(boolean blocking);

接下來看PipeImpl的構造方法中將返回的檔案描述符的地址進行了相應的賦值,接著建立了一個sourcefd 的檔案描述符物件,將他與readFd關聯上,將sinkfd檔案描述符物件和writeFd關聯

接著建立了SourceChannelImpl和SinkChannelImpl物件,SourceChannelImpl類就對應了pipe讀一端的channel,SinkChannelImpl類就對應了pipe寫一端的channel.

這樣PipeImpl物件建立完畢返回賦值帶WindowsSelectorImpl類的wakeupPipe屬性,WindowsSelectorImpl的wakeupSourceFd屬性就對應了剛才建立的Pipe物件的source,

wakeupSinkFd屬性就對應了剛才建立的Pipe物件的sink

接著將wakeupSourceFd這個檔案描述符加入到pollWrapper物件中,構造方法就結束了。

pollWrapper物件中儲存的檔案描述符物件就是呼叫多路複用器select方法時作業系統要掃描的檔案描述符列表。

其實WindowsSelectorImpl建立的Pipe物件的就是為了自己喚醒自己而已,對於呼叫了多路複用器物件的select方法時,是一直阻塞的,實際上作業系統就是在輪訓pollWrapper物件中註冊的檔案描述符物件。試想一下如果這個時候想加入一個新的Channel,那麼勢必得讓select方法返回,一個阻塞在select上的執行緒有以下三種方式可以被喚醒:

1)有資料可讀/寫,或出現異常。

2)阻塞時間到,即time out

3)收到一個non-block的訊號。可由killpthread_kill發出。

1)第二種方法可以排除,因為select一旦阻塞,應無法修改其time out時間。

2)而第三種看來只能在Linux上實現,Windows上沒有這種訊號通知的機制。

所以,看來只有第一種方法了。再回想到為什麼每個Selector.open(),在Windows會建立一對自己和自己的loopbackTCP連線;在Linux上會開一對pipepipeLinux下一般都是成對開啟),估計我們能夠猜得出來——那就是如果想要喚醒select,只需要朝著自己的這個loopback連線發點資料過去,於是,就可以喚醒阻塞在select上的執行緒了。

這時再來看看WindowsSelectorImpl. Wakeup():

    public Selector wakeup() {

        synchronized (interruptLock) {

            if (!interruptTriggered) {

                setWakeupSocket();

                interruptTriggered = true;

            }

        }

        return this;

    }

    private void setWakeupSocket() {

        setWakeupSocket0(wakeupSinkFd);

    }

    private native void setWakeupSocket0(int wakeupSinkFd);

可見wakeup()是通過pipewrite send(scoutFd, &byte, 1, 0),發生一個位元組1,來喚醒poll()。所以在需要的時候就可以呼叫selector.wakeup()來喚醒selector

對於windows,每當呼叫一次Selector的open方法就建立了兩個TCP的連結,一個Server綁定了一個隨機的埠號,一個client連線,server和client相連,如果要是實現wakeup,client就給這個server傳送一點兒資料就OK 了。

對於linux使用的是pipe管道來實現的。

下面來說說Selector.select方法:

    public int select() throws IOException {

        return select(0);

    }

    public int select(long timeout)

        throws IOException

    {

        if (timeout < 0)

            throw new IllegalArgumentException("Negative timeout");

        return lockAndDoSelect((timeout == 0) ? -1 : timeout);

    }

    private int lockAndDoSelect(long timeout) throws IOException {

        synchronized (this) {

            if (!isOpen())

                throw new ClosedSelectorException();

            synchronized (publicKeys) {

                synchronized (publicSelectedKeys) {

                    return doSelect(timeout);

                }

            }

        }

    }

他最後呼叫到了WindowsSelectorImpl的doSelect方法:

    protected int doSelect(long timeout) throws IOException {

        if (channelArray == null)

            throw new ClosedSelectorException();

        this.timeout = timeout; // set selector timeout

        processDeregisterQueue();

        if (interruptTriggered) {

            resetWakeupSocket();

            return 0;

        }

        // Calculate number of helper threads needed for poll. If necessary

        // threads are created here and start waiting on startLock

        adjustThreadsCount();

        finishLock.reset(); // reset finishLock

        // Wakeup helper threads, waiting on startLock, so they start polling.

        // Redundant threads will exit here after wakeup.

        startLock.startThreads();

        // do polling in the main thread. Main thread is responsible for

        // first MAX_SELECTABLE_FDS entries in pollArray.

        try {

            begin();

            try {

                subSelector.poll();

            } catch (IOException e) {

                finishLock.setException(e); // Save this exception

            }

            // Main thread is out of poll(). Wakeup others and wait for them

            if (threads.size() > 0)

                finishLock.waitForHelperThreads();

          } finally {

              end();

          }

        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.

        finishLock.checkForException();

        processDeregisterQueue();

        int updated = updateSelectedKeys();

        // Done with poll(). Set wakeupSocket to nonsignaled for the next run.

        resetWakeupSocket();

        return updated;

    }

來看看幾個關鍵的方法:

    private void adjustThreadsCount() {

        if (threadsCount > threads.size()) {

            // More threads needed. Start more threads.

            for (int i = threads.size(); i < threadsCount; i++) {

                SelectThread newThread = new SelectThread(i);

                threads.add(newThread);

                newThread.setDaemon(true);

                newThread.start();

            }

        } else if (threadsCount < threads.size()) {

            // Some threads become redundant. Remove them from the threads List.

            for (int i = threads.size() - 1 ; i >= threadsCount; i--)

                threads.remove(i).makeZombie();

        }

    }

在分析ServerSocketChannel的regist方法時分析過如果註冊的channel數量超過了1024那麼就要啟動一個新的幫助執行緒來出來,這個方法就是根據threadsCount屬性的值來啟動相應的執行緒,那麼建立的執行緒就是從一個索引的位置(1024,2048順序遞增)起輪訓pollWrapper對應的索引中的檔案描述符的,也就是呼叫doselct方法的主執行緒輪訓的是pollWrapper從0到1023索引中的fd,剩下的有子執行緒相應的處理,他們都是阻塞在

subSelector.poll();方法上

        private int poll() throws IOException{ // poll for the main thread

            return poll0(pollWrapper.pollArrayAddress,

                         Math.min(totalChannels, MAX_SELECTABLE_FDS),

                         readFds, writeFds, exceptFds, timeout);

        }

        private native int poll0(long pollAddress, int numfds,

             int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

當執行緒被喚醒時readFds,writeFds,exceptFds就被相應的賦值

主執行緒和子執行緒是通過startLock和finishLock來互動的,具體的就是如果主執行緒被喚醒了而沒有一個子執行緒被喚醒,那麼主執行緒就要等待至少一個子執行緒被喚醒,當有一個子執行緒被喚醒時他就喚醒其他的子執行緒和主執行緒一起返回。

接下來看看updateSelectedKeys方法:

    private int updateSelectedKeys() {

        updateCount++;

        int numKeysUpdated = 0;

        numKeysUpdated += subSelector.processSelectedKeys(updateCount);

        for (SelectThread t: threads) {

            numKeysUpdated += t.subSelector.processSelectedKeys(updateCount);

        }

        return numKeysUpdated;

    }

主執行緒和子執行緒都要呼叫到 subSelector.processSelectedKeys方法上,程式碼如下:

        private int processSelectedKeys(long updateCount) {

            int numKeysUpdated = 0;

            numKeysUpdated += processFDSet(updateCount, readFds,

                                           PollArrayWrapper.POLLIN,

                                           false);

            numKeysUpdated += processFDSet(updateCount, writeFds,

                                           PollArrayWrapper.POLLCONN |

                                           PollArrayWrapper.POLLOUT,

                                           false);

            numKeysUpdated += processFDSet(updateCount, exceptFds,

                                           PollArrayWrapper.POLLIN |

                                           PollArrayWrapper.POLLCONN |

                                           PollArrayWrapper.POLLOUT,

                                           true);

            return numKeysUpdated;

        }

他們就是將readFds,writeFds與註冊的SelectionKeyImpl物件關聯上設定到相應的事件儲存到SelectorImpl物件的selectedKeys屬性中