jdk 原始碼分析(20)java NIO包簡單分析
阿新 • • 發佈:2018-12-25
BIO 是一種阻塞的io,主要體現在:
1)accept 時候或者客戶端嘗試連線時是阻塞的,
2)資料讀寫是阻塞的,即使是沒有讀到資料,而且每次都是讀寫一個位元組。
對於服務端一般系統中常用的方式是沒接收一個請求new 一個thread,然後由這個handler去處理,如果請求數目過多new 的thread 將會很多,有的時候會建立一個執行緒池,將由執行緒池管理。
如果執行緒過多,線上程中相互切換會消耗大量的效能,而實際上這些執行緒並沒有做什麼事情。同時因為重連線到資料讀取結束是一個阻塞過程,如果網路,或者伺服器效能問題,會極大消耗客戶端的效能去等待服務端的返回結果。
基於以上問題,nio出來了,
Buffer
nio定義了很多的Buffer,buffer是一個物件,儲存需要寫入或者讀出的資料。底層就是一個數組。
下面是一個位的buffer。
channel定義了一個管道(pile),用於資料讀寫,雙向的,可以同時讀寫,其 實現可以分為兩大類,FileChannel(檔案)和SelectorChannel (網路)。nio 會用到ServiceSocketChannel 和SocketChannel。
1)ServerSocketChannel serverChannel = ServerSocketChannel.open();
public abstract class ByteBuffer
extends Buffer
implements Comparable<ByteBuffer>
{
//陣列,資料直接放在堆區。
- final byte[] hb;//Non-null only for heap buffers
final int offset;
boolean isReadOnly;//Valid only for heap buffers
ByteBuffer(int mark, int pos, int lim, int cap,// package-private
byte[] hb, int offset)
{
super(mark, pos, lim, cap);
this.hb = hb;
this.offset = offset;
}
final float[] hb;其分配方法: ByteBuffer buffer = ByteBuffer.allocate(10);
public static
if(capacity <0)
throw new IllegalArgumentException();
return new HeapByteBuffer(capacity, capacity);
}
super(-1,0, lim, cap, new byte[cap],0);
channel定義了一個管道(pile),用於資料讀寫,雙向的,可以同時讀寫,其 實現可以分為兩大類,FileChannel(檔案)和SelectorChannel
public static ServerSocketChannel open() throws IOException{
returnSelectorProvider.provider().openServerSocketChannel();
}
provider = sun.nio.ch.DefaultSelectorProvider.create();
public static SelectorProvider create(){
return new WindowsSelectorProvider();
}
new ServerSocketChannelImpl(this);
ServerSocketChannelImpl(SelectorProvider var1) throws IOException{
super(var1);
this.fd =Net.serverSocket(true);
this.fdVal =IOUtil.fdVal(this.fd);
this.state =0;
}
public static Selector open() throws IOException { return SelectorProvider.provider().openSelector(); }底層是一個WindowsSelectorImpl
public AbstractSelector openSelector() throws IOException { return new WindowsSelectorImpl(this); }底層會定義一個pollWrapper 同時會啟動一個管道pipe
WindowsSelectorImpl(SelectorProvider sp) throws IOException{
super(sp);
pollWrapper = new PollArrayWrapper(INIT_CAP);
wakeupPipe =Pipe.open();
wakeupSourceFd =((SelChImpl)wakeupPipe.source()).getFDVal();
//Disable the Nagle algorithm so that the wakeup is more immediate
SinkChannelImpl sink =(SinkChannelImpl)wakeupPipe.sink();
(sink.sc).socket().setTcpNoDelay(true);
wakeupSinkFd =((SelChImpl)sink).getFDVal();
pollWrapper.addWakeupSocket(wakeupSourceFd,0);
}
protected int doSelect(long timeout) throws IOException{
if(channelArray == null)
throw new ClosedSelectorException();
this.timeout = timeout;//set selector timeout
processDeregisterQueue();
if(interruptTriggered){
resetWakeupSocket();
return0;
}
//Calculate number of helper threads needed for poll.If necessary
// threads are created here and start waiting on startLock
adjustThreadsCount();
finishLock.reset();// reset finishLock
//Wakeup helper threads, waiting on startLock, so they start polling.
//Redundant threads will exit here after wakeup.
startLock.startThreads();
//do polling in the main thread.Main thread is responsible for
// first MAX_SELECTABLE_FDS entries in pollArray.
try {
begin();
try {
subSelector.poll();
} catch (IOException e){
finishLock.setException(e);//Save this exception
}
//Main thread is out of poll().Wakeup others and wait for them
if(threads.size()>0)
finishLock.waitForHelperThreads();
} finally {
end();
}
//Done with poll().Set wakeupSocket to nonsignaled for the next run.
finishLock.checkForException();
processDeregisterQueue();
int updated = updateSelectedKeys();
//Done with poll().Set wakeupSocket to nonsignaled for the next run.
resetWakeupSocket();
return updated;
}
private int processFDSet(long updateCount, int[] fds, int rOps,
boolean isExceptFds)
{
int numKeysUpdated =0;
for(int i =1; i <= fds[0]; i++){
int desc = fds[i];
if(desc == wakeupSourceFd){
synchronized (interruptLock){
interruptTriggered = true;
}
continue;
}
<