1. 程式人生 > >《Java 原始碼分析》:Java NIO 之 Selector(第一部分Selector.open())

《Java 原始碼分析》:Java NIO 之 Selector(第一部分Selector.open())

《Java 原始碼分析》 :Java NIO 之 Selector(第一部分Selector.open())

關於Selector類主要涉及兩個重要的方法,如下:

1、Selector.open()

2、select()

由於篇幅限制,這篇主要從原始碼的角度來介紹Selector selector = Selector.open()背後主要做了什麼,發生了什麼。

Selector類中的open()原始碼如下:

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

函式功能:開啟一個選擇器。這個新的選擇器的是通過呼叫系統級預設 SelectorProvider 物件的 openSelector 方法來建立的。

SelectorProvider.provider().openSelector();這行程式碼中我們先看SelectorProvider.provider();具體做了些什麼。

provider()方法的原始碼如下:

    public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null
)//保證只有一個provider物件例項 return provider; return AccessController.doPrivileged( new PrivilegedAction<SelectorProvider>() { public SelectorProvider run() { if (loadProviderFromProperty()) return
provider; if (loadProviderAsService()) return provider; provider = sun.nio.ch.DefaultSelectorProvider.create(); return provider; } }); } }

此函式功能:返回此時呼叫JVM的系統級預設的SelectorProvider

在程式碼中

    而if (provider != null)
            returnprovider;

是用來保證了整個程式中只有一個WindowsSelectorProvider物件;

由於我自己看的原始碼不是openjdk,因此在類庫中根本就沒有sun.nio.ch這個包。

provider由sun.nio.ch.DefaultSelectorProvider.create();建立

sun.nio.ch.DefaultSelectorProvider類的原始碼如下:

    /**
       * Creates this platform's default SelectorProvider
       */

      public class DefaultSelectorProvider {

          /**
           * Prevent instantiation.
           */
          private DefaultSelectorProvider() { }

          /**
           * Returns the default SelectorProvider.
           */
          public static SelectorProvider create() {
              return new sun.nio.ch.WindowsSelectorProvider();
          }

      }

所以從上面我們就可以看到:DefaultSelectorProvider 類只有一個私有的建構函式和一個create方法。其中在類 SelectorProvider 中的provider()方法中
provider = sun.nio.ch.DefaultSelectorProvider.create();會根據作業系統來返回不同的實現類,windows平臺就返回WindowsSelectorProvider物件例項;

以上就是 SelectorProvider.provider() 產生的一個 SelectorProvider (子類 WindowsSelectorProvider)物件例項的過程。

結論:有上面我們知道Selector.open()方法中的SelectorProvider.provider()實際上就是例項化了一個WindowsSelectorProvider物件,其中WindowsSelectorProvider為SelectorProvider的子類。

    public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }

有了上面的基礎,我們接著看上面程式碼塊中的後面一半:SelectorProvider.openSelector()方法的具體實現過程。

SelectorProvider.provider().openSelector();根據前面的分析實際上就是 WindowsSelectorProvider. openSelector()。

因此,下面主要看下:WindowsSelectorProvider. openSelector()這個主要做了些什麼操作。

WindowsSelectorProvider類的原始碼如下:

 public class WindowsSelectorProvider extends SelectorProviderImpl {

          public AbstractSelector openSelector() throws IOException {
             return new WindowsSelectorImpl(this);
         }
  }

因此 WindowsSelectorProvider. openSelector()實現的邏輯就是直接例項化 WindowsSelectorImpl物件。

以上的思路還是相當清晰且容易理解的哈。簡單來說:Selector selector = Selector.open();實際上就是new 了一個 WindowsSelectorImpl物件例項。

既然是例項化一個 WindowsSelectorImpl。因此,我們這有必要看下這個類的建構函式。

這個類才是我們要關注的重點:WindowsSelectorImpl

繼承關係如下:

final class WindowsSelectorImpl extends SelectorImpl

其建構函式為:

      WindowsSelectorImpl(SelectorProvider sp) throws IOException {
             super(sp);
             pollWrapper = new PollArrayWrapper(INIT_CAP);
             wakeupPipe = Pipe.open();
             wakeupSourceFd = ((SelChImpl)wakeupPipe.source()).getFDVal();

             // Disable the Nagle algorithm so that the wakeup is more immediate
             SinkChannelImpl sink = (SinkChannelImpl)wakeupPipe.sink();
             (sink.sc).socket().setTcpNoDelay(true);
             wakeupSinkFd = ((SelChImpl)sink).getFDVal();

             pollWrapper.addWakeupSocket(wakeupSourceFd, 0);
         }

這段程式碼中做了如下幾個事情

1、Pipe.open()開啟一個管道(開啟管道的實現後面再看);

2、拿到wakeupSourceFd和wakeupSinkFd兩個檔案描述符;

3、把喚醒端的檔案描述符(wakeupSourceFd)放到pollWrapper裡;PollArrayWrapper類會在文章後面進行介紹。

那麼為什麼需要一個管道,這個管道是怎麼實現的?接下來看Pipe.open()做了什麼

Pipe類在 java.nio.channels包下面

    //函式功能:開啟一個管道
    public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
    }

Pipe.open()方法直接是 呼叫了 SelectorProvider.openPipe()方法來實現的。

進一步來看 SelectorProvider.openPipe() 這個方法。

在SelectorProvider中的openPipe()是一個抽象方法,如下:

public abstract Pipe openPipe()
    throws IOException;

由於SelectorProvider.provider()實際上返回的是SelectorProvider的子類WindowSelectorProvider的例項物件。沿著 WindowsSelectorProvider的繼承關係找了下,SelectorProvider中的openPipe()抽象方法
是在 SelectorProviderImpl類中實現的,即是在 WindowSelectorProvider類的父類來實現的。

這裡有必要說下 WindowsSelectorProvider的繼承關係:

WindowSelectorProvider 的直接父類是 SelectorProviderImpl;SelectorProviderImpl 的直接父類是 SelectorProvider。

SelectorProviderImpl 類的程式碼如下:

      public abstract class SelectorProviderImpl
              extends SelectorProvider
          {
                //...省略了一些不相干的函式
              public Pipe openPipe() throws IOException {
                  return new PipeImpl(this);
              }
               //...
          }

從上面可知:

開啟管道Pipe.open()方法 直接呼叫的 SelectorProvider.openPipe()方法,而SelectorProvider類中的openPipe()方法 直接返回的是:new PipeImpl(this),即PipeImpl類的物件例項。

看到這裡的時候,我在想為什麼不從最開始的Pipe類的open()中直接返回PipeImpl的例項物件呢,而是要委託給SelectorProviderImpl(具體程式碼看下面)呢,原因可能在於PipeImpl例項需要一個WindowsSelectorProvider且所有環境有且只有一個,如果不採用這種方式可能會更復雜,不想了,繼續往後面看

Pipe類的open()方法

    //函式功能:開啟一個管道
    public static Pipe open() throws IOException {
        return SelectorProvider.provider().openPipe();
    }

經過上面的分析,我們已經知道了Pipe.open()在程式碼層面的表現為:例項化了一個PipeImpl物件。

下面看下PipeImpl 類的建構函式


        PipeImpl(final SelectorProvider sp) throws IOException {
             try {
                 AccessController.doPrivileged(new Initializer(sp));
             } catch (PrivilegedActionException x) {
                 throw (IOException)x.getCause();
             }
         }

這個構造方法中的程式碼雖然比較不熟悉,是自己第一次見到,但是我們還是要想辦法來看下,是吧。

先不看Initializer這個類裡面的具體實現,我們來看下PipeImpl類的建構函式中

AccessController.doPrivileged(new Initializer(sp))
這行程式碼中所涉及的:dePrivileged這個方法是幹什麼的?

AccessController 類中的 doPrivileged(PrivilegedAction action) 方法是一個native方法,如下:

@CallerSensitive
public static native <T> T doPrivileged(PrivilegedAction<T> action);

關於 AccessController.doPrivileged方法的介紹,可以參考下篇博文:

看了一些關於AccessController.doPrivileged的資料,還沒有怎麼懂,但是可以這裡來理解:

首先:AccessController.doPrivileged意思是這個是特別的,不用做許可權檢查.

在什麼地方會用到呢

答:假設1.jar中有類可以讀取一個檔案,現在我們要使用1.jar去做這個事情.
但是我們的類本生是沒有許可權去讀取那個檔案的,一般情況下就是眼睜睜的看著了. 但是java提供了doPrivileged.在1.jar中如果讀取檔案的方法是通過doPrivileged來實現的.
就不會有後面的檢查了,現在我們就可以使用1.jar去讀取那個檔案了.

利用doPrivileged就實現了沒有許可權的人借用有許可權的人來達到一定的目的。

回到原題:

AccessController.doPrivileged(new Initializer(sp)) 經過許可權的檢查之後就會直接執行Initializer中的run方法
,下面來看下Initializer這個類中的run方法。

Initializer是PipeImpl類的內部類,原始碼如下:

       private class Initializer
              implements PrivilegedExceptionAction<Void>
          {

              private final SelectorProvider sp;

              private Initializer(SelectorProvider sp) {
                  this.sp = sp;
              }

              public Void run() throws IOException {
                  ServerSocketChannel ssc = null;
                  SocketChannel sc1 = null;
                  SocketChannel sc2 = null;

                  try {
                      // loopback address
                      InetAddress lb = InetAddress.getByName("127.0.0.1");
                      assert(lb.isLoopbackAddress());

                      // bind ServerSocketChannel to a port on the loopback address
                      // 將ServerSocketChannel繫結本地環回地址,埠號為 0
                      ssc = ServerSocketChannel.open();
                      ssc.socket().bind(new InetSocketAddress(lb, 0));

                      // Establish connection (assumes connections are eagerly
                      // accepted)
                      // 建立連線
                      InetSocketAddress sa
                          = new InetSocketAddress(lb, ssc.socket().getLocalPort());
                      sc1 = SocketChannel.open(sa);
                      //向SocketChannel中寫入資料
                      ByteBuffer bb = ByteBuffer.allocate(8);
                     long secret = rnd.nextLong();
                     bb.putLong(secret).flip();
                     sc1.write(bb);

                     // Get a connection and verify it is legitimate
                     for (;;) {
                         sc2 = ssc.accept();
                         bb.clear();
                         sc2.read(bb);
                         bb.rewind();
                         if (bb.getLong() == secret)
                             break;
                         sc2.close();
                     }

                     // Create source and sink channels
                     source = new SourceChannelImpl(sp, sc1);
                     sink = new SinkChannelImpl(sp, sc2);
                 } catch (IOException e) {
                     try {
                         if (sc1 != null)
                             sc1.close();
                         if (sc2 != null)
                             sc2.close();
                     } catch (IOException e2) { }
                     IOException x = new IOException("Unable to establish"
                                                     + " loopback connection");
                     x.initCause(e);
                     throw x;
                 } finally {
                     try {
                         if (ssc != null)
                             ssc.close();
                     } catch (IOException e2) { }
                 }
                 return null;
             }
         }

從 Initializer中run方法中,我們可以得到的一點是:建立了一個 loopback connection.

windows下的實現是建立兩個本地的socketChannel,然後連線(連結的過程通過寫一個隨機long做兩個socket的連結校驗),兩個socketChannel分別實現了管道的source與sink端。
source端由前面提到的WindowsSelectorImpl放到了pollWrapper中(pollWrapper.addWakeupSocket(wakeupSourceFd, 0))

最後,看下PollArrayWrapper這個類

PollArrayWrapper類主要在前面的WindowsSelectorImpl的建構函式中有這樣一行程式碼:pollWrapper.addWakeupSocket(wakeupSourceFd, 0)(作用:把喚醒端的檔案描述符(wakeupSourceFd)放到pollWrapper裡)

PollArrayWrapper類中addWakeupSocket方法的原始碼如下:

    // Adds Windows wakeup socket at a given index.
    void addWakeupSocket(int fdVal, int index) {
        putDescriptor(index, fdVal);
        putEventOps(index, POLLIN);
    }
    // Access methods for fd structures
    void putDescriptor(int i, int fd) {
        pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
    }

    void putEventOps(int i, int event) {
        pollArray.putShort(SIZE_POLLFD * i + EVENT_OFFSET, (short)event);
    }

這裡將source的POLLIN事件標識為感興趣的,當sink端有資料寫入時,source對應的檔案描述符wakeupSourceFd就會處於就緒狀態

到這裡從原始碼的角度來看了Selector selector = Selector.open()主要做了些什麼

主要完成建立Pipe,並把pipe的wakeupSourceFd放入pollArray中,這個pollArray是Selector的樞紐。這裡是以Windows的實現來看,在windows下通過兩個連結的socketChannel實現了Pipe,linux下則是直接使用系統的pipe。

小結

Selector selector = Selector.open();實際上就是new 了一個 WindowsSelectorImpl物件例項。

以及建立了Pipe,並把pipe的wakeupSourceFd放入pollArray中,這個pollArray是Selector的樞紐。這裡是以Windows的實現來看,在windows下通過兩個連結的socketChannel實現了Pipe,linux下則是直接使用系統的pipe。

關於第二部分就是從原始碼的角度來看下selector.select()背後做了些什麼,敬請期待。

最後上一張關於Selector工作原理的圖:(來源自參考資料所貼出的部落格)

這張圖自己目前也還沒有全部弄懂(只知道大概流程確實是這樣),有待於自己進一步的接觸後才能更好的理解。

參考資料