1. 程式人生 > >jdk 原始碼分析(20)java NIO包簡單分析

jdk 原始碼分析(20)java NIO包簡單分析

BIO 是一種阻塞的io,主要體現在: 1)accept 時候或者客戶端嘗試連線時是阻塞的, 2)資料讀寫是阻塞的,即使是沒有讀到資料,而且每次都是讀寫一個位元組。 對於服務端一般系統中常用的方式是沒接收一個請求new 一個thread,然後由這個handler去處理,如果請求數目過多new 的thread 將會很多,有的時候會建立一個執行緒池,將由執行緒池管理。 如果執行緒過多,線上程中相互切換會消耗大量的效能,而實際上這些執行緒並沒有做什麼事情。同時因為重連線到資料讀取結束是一個阻塞過程,如果網路,或者伺服器效能問題,會極大消耗客戶端的效能去等待服務端的返回結果。 基於以上問題,nio出來了, Buffer nio定義了很多的Buffer,buffer是一個物件,儲存需要寫入或者讀出的資料。底層就是一個數組。 下面是一個位的buffer。
  1. public abstract class ByteBuffer
  2. extends Buffer
  3. implements Comparable<ByteBuffer>
  4. {
  5. //陣列,資料直接放在堆區。
  6. final byte[] hb;//Non-null only for heap buffers
  7. final int offset;
  8. boolean isReadOnly;//Valid only for heap buffers
  9. ByteBuffer(int mark, int pos, int lim, int cap,// package-private
  10. byte[] hb, int offset)
  11. {
  12. super(mark, pos, lim, cap);
  13. this.hb = hb;
  14. this.offset = offset;
  15. }
下面是個floatBuffer的定義的陣列。
final float[] hb;
其分配方法:  ByteBuffer buffer = ByteBuffer.allocate(10); 
  1. public static
    ByteBuffer allocate(int capacity){
  2. if(capacity <0)
  3. throw new IllegalArgumentException();
  4. return new HeapByteBuffer(capacity, capacity);
  5. }
直接呼叫,在這裡new 的陣列,
  1. super(-1,0, lim, cap, new byte[cap],0);
channel
channel定義了一個管道(pile),用於資料讀寫,雙向的,可以同時讀寫,其 實現可以分為兩大類,FileChannel(檔案)和SelectorChannel
(網路)。nio 會用到ServiceSocketChannel 和SocketChannel。 1)ServerSocketChannel serverChannel = ServerSocketChannel.open();  
  1. public static ServerSocketChannel open() throws IOException{
  2. returnSelectorProvider.provider().openServerSocketChannel();
  3. }
這個SelectorProvider.provider() 會呼叫
  1. provider = sun.nio.ch.DefaultSelectorProvider.create();
而這個就是
  1. public static SelectorProvider create(){
  2. return new WindowsSelectorProvider();
  3. }
最後openServerSocketChannel 會new 一個ServerSocketChannelImpl
  1. new ServerSocketChannelImpl(this);
這個會刪除一個FileDescriptor
  1. ServerSocketChannelImpl(SelectorProvider var1) throws IOException{
  2. super(var1);
  3. this.fd =Net.serverSocket(true);
  4. this.fdVal =IOUtil.fdVal(this.fd);
  5. this.state =0;
  6. }
selector 選擇器(多路複用器),通過channel註冊個selector ,selector去查詢是是否有channel準備好資料了,如果有資料準備好了,將啟動新的執行緒處理。一般selector會輪詢方式查詢。 selector底層程式碼參考的openjdk。selector 同步open生存,
public static Selector open() throws IOException {
    return SelectorProvider.provider().openSelector();
}
底層是一個WindowsSelectorImpl
public AbstractSelector openSelector() throws IOException {
    return new WindowsSelectorImpl(this);
}
底層會定義一個pollWrapper 同時會啟動一個管道pipe
  1. WindowsSelectorImpl(SelectorProvider sp) throws IOException{
  2. super(sp);
  3. pollWrapper = new PollArrayWrapper(INIT_CAP);
  4. wakeupPipe =Pipe.open();
  5. wakeupSourceFd =((SelChImpl)wakeupPipe.source()).getFDVal();
  6. //Disable the Nagle algorithm so that the wakeup is more immediate
  7. SinkChannelImpl sink =(SinkChannelImpl)wakeupPipe.sink();
  8. (sink.sc).socket().setTcpNoDelay(true);
  9. wakeupSinkFd =((SelChImpl)sink).getFDVal();
  10. pollWrapper.addWakeupSocket(wakeupSourceFd,0);
  11. }
selector.select(); 底層主要操作 這個方法主要是查詢channel是否準備好,如果準備就將準備好的channel寫到publicSelectedKeys。
  1. protected int doSelect(long timeout) throws IOException{
  2. if(channelArray == null)
  3. throw new ClosedSelectorException();
  4. this.timeout = timeout;//set selector timeout
  5. processDeregisterQueue();
  6. if(interruptTriggered){
  7. resetWakeupSocket();
  8. return0;
  9. }
  10. //Calculate number of helper threads needed for poll.If necessary
  11. // threads are created here and start waiting on startLock
  12. adjustThreadsCount();
  13. finishLock.reset();// reset finishLock
  14. //Wakeup helper threads, waiting on startLock, so they start polling.
  15. //Redundant threads will exit here after wakeup.
  16. startLock.startThreads();
  17. //do polling in the main thread.Main thread is responsible for
  18. // first MAX_SELECTABLE_FDS entries in pollArray.
  19. try {
  20. begin();
  21. try {
  22. subSelector.poll();
  23. } catch (IOException e){
  24. finishLock.setException(e);//Save this exception
  25. }
  26. //Main thread is out of poll().Wakeup others and wait for them
  27. if(threads.size()>0)
  28. finishLock.waitForHelperThreads();
  29. } finally {
  30. end();
  31. }
  32. //Done with poll().Set wakeupSocket to nonsignaled for the next run.
  33. finishLock.checkForException();
  34. processDeregisterQueue();
  35. int updated = updateSelectedKeys();
  36. //Done with poll().Set wakeupSocket to nonsignaled for the next run.
  37. resetWakeupSocket();
  38. return updated;
  39. }
updateSelectedKeys最後會呼叫processFDSet,將SelectionKeyImpl 放入publicSelectedKeys集合中,然後selector 通過迭代器遍歷SelectionKeyImpl,因為SelectionKeyImpl包含channel資訊,可以重channel讀取資料。
  1. private int processFDSet(long updateCount, int[] fds, int rOps,
  2. boolean isExceptFds)
  3. {
  4. int numKeysUpdated =0;
  5. for(int i =1; i <= fds[0]; i++){
  6. int desc = fds[i];
  7. if(desc == wakeupSourceFd){
  8. synchronized (interruptLock){
  9. interruptTriggered = true;
  10. }
  11. continue;
  12. }
  13. <