1. 程式人生 > >SocketChannelImpl 解析一(通道連線,傳送資料)

SocketChannelImpl 解析一(通道連線,傳送資料)

ThreadLocal解析 :[url]http://donald-draper.iteye.com/blog/2368159[/url]
Java NIO ByteBuffer詳解:[url]http://donald-draper.iteye.com/blog/2357084[/url]
DirectByteBuffer簡介:[url]http://donald-draper.iteye.com/blog/2372351[/url]
SelectorProvider定義:[url]http://donald-draper.iteye.com/blog/2369615[/url]
ServerSocketChannelImpl解析:[url]http://donald-draper.iteye.com/blog/2370912[/url]
SocketChannel介面定義:[url]http://donald-draper.iteye.com/blog/2371218[/url]

[size=medium][b]引言:[/b][/size]
在SocketChannel介面定義這篇文章中,我們看了socket的連線,完成連線,是否正在建立連線,讀緩衝區寫到通道,聚集寫,讀通道寫緩衝區,分散讀等方法。在NIO包中TCP傳送接受位元組序列通過SocketChannel。今天我們來看一下SocketChannel的具體
實現。我們從SocketChannel的open方法開始。
//SocketChannel
 public static SocketChannel open() throws IOException {
return SelectorProvider.provider().openSocketChannel();
}

SelectorProvider.provider()這個過程我們就不詳說了實際是載入系統預設的SelectorProvider
例項,則個我們在SelectorProvider定義有提過,簡單看一下:
//SelectorProviderImpl
public abstract class SelectorProviderImpl extends SelectorProvider
{
public SocketChannel openSocketChannel()
throws IOException
{
return new SocketChannelImpl(this);
}
}

從上面可以看出,SocketChannel的預設實現為SocketChannelImpl。再來看SocketChannelImpl的變數
宣告和相關方法的實現。
class SocketChannelImpl extends SocketChannel
implements SelChImpl
{
private static NativeDispatcher nd = new SocketDispatcher();//socket的分發器
private final FileDescriptor fd;//檔案描述
private final int fdVal;//檔案描述id
private volatile long readerThread;//讀執行緒
private volatile long writerThread;//寫執行緒
private final Object readLock;//讀鎖
private final Object writeLock;//寫鎖
private final Object stateLock;//狀態鎖
private static final int ST_UNINITIALIZED = -1;//未初始化
private static final int ST_UNCONNECTED = 0;//未連線
private static final int ST_PENDING = 1;//正在連線
private static final int ST_CONNECTED = 2;//已連線
private static final int ST_KILLPENDING = 3;//正在關閉
private static final int ST_KILLED = 4;//關閉
private int state;//通道狀態
private SocketAddress localAddress;//socket本地地址
private SocketAddress remoteAddress;//socket遠端地址
private boolean isInputOpen;//輸入流是否開啟
private boolean isOutputOpen;//輸出流是否開啟
private boolean readyToConnect;//是否正在準備連線
private Socket socket;//通道套接字
static final boolean $assertionsDisabled = !sun/nio/ch/SocketChannelImpl.desiredAssertionStatus();
static
{
//載入nio,net資源庫
Util.load();
}
}

SocketChannelImpl的構造方法有三種分別如下
1.
 SocketChannelImpl(SelectorProvider selectorprovider)
throws IOException
{
super(selectorprovider);
readerThread = 0L;
writerThread = 0L;
//初始化讀寫及狀態鎖
readLock = new Object();
writeLock = new Object();
stateLock = new Object();
state = -1;//狀態預設為未初始化
isInputOpen = true;
isOutputOpen = true;
readyToConnect = false;
fd = Net.socket(true);//初始化檔案描述符
fdVal = IOUtil.fdVal(fd);//獲取檔案描述的值
state = 0;//已初始化,未連線
}

2.
  SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, boolean flag)
throws IOException
{
super(selectorprovider);
readerThread = 0L;
writerThread = 0L;
readLock = new Object();
writeLock = new Object();
stateLock = new Object();
state = -1;
isInputOpen = true;
isOutputOpen = true;
readyToConnect = false;
fd = filedescriptor;
fdVal = IOUtil.fdVal(filedescriptor);
state = 0;//已初始化,未連線
if(flag)
//初始化本地地址
localAddress = Net.localAddress(filedescriptor);
}

3.
  SocketChannelImpl(SelectorProvider selectorprovider, FileDescriptor filedescriptor, InetSocketAddress inetsocketaddress)
throws IOException
{
super(selectorprovider);
readerThread = 0L;
writerThread = 0L;
readLock = new Object();
writeLock = new Object();
stateLock = new Object();
state = -1;
isInputOpen = true;
isOutputOpen = true;
readyToConnect = false;
fd = filedescriptor;
fdVal = IOUtil.fdVal(filedescriptor);
state = 2;//已連線
localAddress = Net.localAddress(filedescriptor);
remoteAddress = inetsocketaddress;
}

我們需要關注的是這兩點,
a.fd = Net.socket(true);//初始化檔案描述符

//Net
 static FileDescriptor socket(boolean flag)
throws IOException
{
return socket(UNSPEC, flag);
}
static FileDescriptor socket(ProtocolFamily protocolfamily, boolean flag)
throws IOException
{
boolean flag1 = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET;
return IOUtil.newFD(socket0(flag1, flag, false));
}
private static native int socket0(boolean flag, boolean flag1, boolean flag2);

//IOUtil
 static FileDescriptor newFD(int i)
{
FileDescriptor filedescriptor = new FileDescriptor();
setfdVal(filedescriptor, i);
return filedescriptor;
}

這個我們在ServerSocketChannelImpl解析這篇文章接觸過Net和IOUtil,這裡不具體的解釋了
,看一下即可,很容易理解。
b.localAddress = Net.localAddress(filedescriptor);

//Net
 static InetSocketAddress localAddress(FileDescriptor filedescriptor)
throws IOException
{
return new InetSocketAddress(localInetAddress(filedescriptor), localPort(filedescriptor));
}
private static native int localPort(FileDescriptor filedescriptor)
throws IOException;

private static native InetAddress localInetAddress(FileDescriptor filedescriptor)
throws IOException;

從上面可以看出SocketChannelImpl構造主要是初始化讀寫及狀態鎖和通道socket檔案描述。
來看SocketChannelImpl的其他方法
//連線socket地址
 public boolean connect(SocketAddress socketaddress)
throws IOException
{
boolean flag = false;
Object obj = readLock;//同步讀鎖
JVM INSTR monitorenter ;//try
Object obj1 = writeLock;//同步寫鎖
JVM INSTR monitorenter ;
InetSocketAddress inetsocketaddress;
//確保socket通道處於開啟狀態,沒有連線
ensureOpenAndUnconnected();
//檢查socketAddress正確與合法性
inetsocketaddress = Net.checkAddress(socketaddress);
SecurityManager securitymanager = System.getSecurityManager();
if(securitymanager != null)
//檢查當前執行緒是否有Connect方法的訪問控制權限
securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
//同步regLock鎖,Lock for registration and configureBlocking operations
//這個在AbstractSelectableChannel中定義
Object obj2 = blockingLock();
JVM INSTR monitorenter ;
int i = 0;
//Marks the begin/end of an I/O operation that might block indefinitely.
begin();//與end協調使用,用於可能阻塞IO操作
boolean flag1;
//同步狀態鎖
synchronized(stateLock)
{
if(isOpen())
break MISSING_BLOCK_LABEL_149;
flag1 = false;
}
//清除Reader執行緒
readerCleanup();
end(i > 0 || i == -2);
//斷言連線結果大於-2,則連線失敗,丟擲斷言異常
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
return flag1;
if(localAddress == null)
//beforeTcpConnect為靜態空方法體,這個我們在ServerSocketChannelImpl中有說
NetHooks.beforeTcpConnect(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
//初始化讀執行緒
readerThread = NativeThread.current();
obj3;
JVM INSTR monitorexit ;
do
{
InetAddress inetaddress = inetsocketaddress.getAddress();
if(inetaddress.isAnyLocalAddress())
inetaddress = InetAddress.getLocalHost();
//嘗試連線socket地址
i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
} while(i == -3 && isOpen());
readerCleanup();
end(i > 0 || i == -2);
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
break MISSING_BLOCK_LABEL_358;
Exception exception1;
exception1;
readerCleanup();
end(i > 0 || i == -2);
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
else
throw exception1;
IOException ioexception;
ioexception;
//出現IO異常,則關閉通道
close();
throw ioexception;
Object obj4 = stateLock;
JVM INSTR monitorenter ;
remoteAddress = inetsocketaddress;
if(i <= 0) goto _L2; else goto _L1
_L1:
state = 2;
if(isOpen())
localAddress = Net.localAddress(fd);
true;
obj2;
JVM INSTR monitorexit ;//退出同步
obj1;
JVM INSTR monitorexit ;
obj;
JVM INSTR monitorexit ;
return;
_L2:
if(!isBlocking())
state = 1;
else
if(!$assertionsDisabled)
throw new AssertionError();
obj4;
JVM INSTR monitorexit ;
goto _L3
Exception exception2;
exception2;
obj4;
JVM INSTR monitorexit ;
throw exception2;
_L3:
obj2;
JVM INSTR monitorexit ;
goto _L4
Exception exception3;
exception3;
obj2;
JVM INSTR monitorexit ;
throw exception3;
_L4:
false;
obj1;
JVM INSTR monitorexit ;
obj;
JVM INSTR monitorexit ;
return;
Exception exception4;
exception4;
throw exception4;
Exception exception5;
exception5;
throw exception5;
}

connect連線方法有幾點要看:
1.
//確保socket通道處於開啟狀態,沒有連線
ensureOpenAndUnconnected();

2.
//清除Reader執行緒
readerCleanup();

3.嘗試連線socket地址
do
{
InetAddress inetaddress = inetsocketaddress.getAddress();
if(inetaddress.isAnyLocalAddress())
inetaddress = InetAddress.getLocalHost();
//嘗試連線socket地址
i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
} while(i == -3 && isOpen());

4.檢查連線結果
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
else
throw exception1;
IOException ioexception;
ioexception;
//出現IO異常,則關閉通道
close();

下面分別來看這四點:
1.
//確保socket通道處於開啟狀態,沒有連線
ensureOpenAndUnconnected();

 void ensureOpenAndUnconnected()
throws IOException
{
synchronized(stateLock)
{
if(!isOpen())//通道關閉
throw new ClosedChannelException();
if(state == 2)//已經連線
throw new AlreadyConnectedException();
if(state == 1)//正在來接
throw new ConnectionPendingException();
}
}

2.
//清除Reader執行緒
readerCleanup();

 private void readerCleanup()
throws IOException
{
synchronized(stateLock)
{
readerThread = 0L;
//連線正在關閉,則呼叫kill完成實際關閉工作
if(state == 3)
kill();
}
}

3.嘗試連線socket地址
do
{
InetAddress inetaddress = inetsocketaddress.getAddress();
if(inetaddress.isAnyLocalAddress())
inetaddress = InetAddress.getLocalHost();
//嘗試連線socket地址,這裡為什麼是循序,因為連線操作有可能被中斷,及i為-3,
//當中斷位消除時,繼續嘗試連線
i = Net.connect(fd, inetaddress, inetsocketaddress.getPort());
} while(i == -3 && isOpen());

//Net
static int connect(FileDescriptor filedescriptor, InetAddress inetaddress, int i)
throws IOException
{
return connect(UNSPEC, filedescriptor, inetaddress, i);
}

static int connect(ProtocolFamily protocolfamily, FileDescriptor filedescriptor, InetAddress inetaddress, int i)
throws IOException
{
boolean flag = isIPv6Available() && protocolfamily != StandardProtocolFamily.INET;
return connect0(flag, filedescriptor, inetaddress, i);
}

private static native int connect0(boolean flag, FileDescriptor filedescriptor, InetAddress inetaddress, int i)
throws IOException;

4.檢查連線結果
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
else
throw exception1;
IOException ioexception;
ioexception;
//出現IO異常,則關閉通道
close();

這一點我們需要關注的是IOStatus.check(i)這句:
//IOStatus
package sun.nio.ch;
final class IOStatus
{
static final int EOF = -1;//結束
static final int UNAVAILABLE = -2;//不可用
static final int INTERRUPTED = -3;//操作中斷
static final int UNSUPPORTED = -4;//不支援
static final int THROWN = -5;//異常
static final int UNSUPPORTED_CASE = -6;
private IOStatus()
{
}
static int normalize(int i)
{
if(i == -2)
return 0;
else
return i;
}
//連線結果i大於等於-2,即連線失敗
static boolean check(int i)
{
return i >= -2;
}
static long normalize(long l)
{
if(l == -2L)
return 0L;
else
return l;
}
static boolean check(long l)
{
return l >= -2L;
}
static boolean checkAll(long l)
{
return l > -1L || l < -6L;
}
}

從上面可以看出,connect連線方法首先同步讀鎖和寫鎖,確保socket通道開啟,並沒有連線;
然後檢查socket地址的正確性與合法性,然後檢查當前執行緒是否有Connect方法的訪問控制權限,
最後嘗試連線socket地址。
再來看地址繫結方法bind
 public SocketChannel bind(SocketAddress socketaddress)
throws IOException
{
//同步讀鎖,寫鎖,狀態鎖
synchronized(readLock)
{
synchronized(writeLock)
{
synchronized(stateLock)
{
if(!isOpen())//通道關閉
throw new ClosedChannelException();
if(state == 1)//正在連線
throw new ConnectionPendingException();
if(localAddress != null)
throw new AlreadyBoundException();
//檢查地址
InetSocketAddress inetsocketaddress = socketaddress != null ? Net.checkAddress(socketaddress) : new InetSocketAddress(0);
NetHooks.beforeTcpBind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
//繫結地址,這個在ServerSocketChannelImpl篇,一看過不在重複。
Net.bind(fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
//初始化localAddress
localAddress = Net.localAddress(fd);
}
}
}
return this;
}

下面來看SocketChannelImpl的幾個讀寫方法
先來看從緩衝區讀取資料,寫到通道
public int write(ByteBuffer bytebuffer)
throws IOException
{
if(bytebuffer == null)
throw new NullPointerException();
Object obj = writeLock;//同步寫鎖
JVM INSTR monitorenter ;//進入同步
int i;
//確保沒有關閉輸出流
ensureWriteOpen();
i = 0;
begin();//end,
int k;
synchronized(stateLock)
{
if(isOpen())
break MISSING_BLOCK_LABEL_140;
k = 0;
}
//清除寫執行緒
writerCleanup();
end(i > 0 || i == -2);
//同步狀態鎖,如果通道輸出流關閉或寫異常,則丟擲AsynchronousCloseException
synchronized(stateLock)
{
if(i <= 0 && !isOutputOpen)
throw new AsynchronousCloseException();
}
//斷言,檢查寫結果
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
return k;
//初始化執行緒
writerThread = NativeThread.current();
obj1;
JVM INSTR monitorexit ;
int j;
do
//寫位元組流,為什麼是迴圈寫,如果位元組序列太多,傳送緩衝區一次寫不完,需要分多次寫
i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
while(i == -3 && isOpen());
j = IOStatus.normalize(i);
writerCleanup();
end(i > 0 || i == -2);
synchronized(stateLock)
{
if(i <= 0 && !isOutputOpen)
throw new AsynchronousCloseException();
}
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
obj;
JVM INSTR monitorexit ;
return j;
Exception exception3;
exception3;
writerCleanup();
end(i > 0 || i == -2);
synchronized(stateLock)
{
if(i <= 0 && !isOutputOpen)
throw new AsynchronousCloseException();
}
if(!$assertionsDisabled && !IOStatus.check(i))
throw new AssertionError();
else
throw exception3;
Exception exception5;
exception5;
throw exception5;
}

寫操作需要關注一下幾點,
1.
//確保沒有關閉輸出流
ensureWriteOpen();

2.
 //寫位元組流
do
//寫位元組流
i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
while(i == -3 && isOpen())

3.
//清除寫執行緒
writerCleanup();

下面分別來看這三點
1.
//確保沒有關閉輸出流
ensureWriteOpen();
private void ensureWriteOpen()
throws ClosedChannelException
{
synchronized(stateLock)
{
if(!isOpen())//通道關閉
throw new ClosedChannelException();
if(!isOutputOpen)//輸出流關閉
throw new ClosedChannelException();
if(!isConnected())//還沒連線
throw new NotYetConnectedException();
}
}

2.
//寫位元組流
do
//寫位元組流,為什麼是迴圈寫,如果位元組序列太多,傳送緩衝區一次寫不完,需要分多次寫
i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
while(i == -3 && isOpen())

//IOUtil
static int write(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)
throws IOException
{
int i;
ByteBuffer bytebuffer1;
//如果ByteBffer為DirectBuffer,則呼叫writeFromNativeBuffer
if(bytebuffer instanceof DirectBuffer)
return writeFromNativeBuffer(filedescriptor, bytebuffer, l, nativedispatcher, obj);
//獲取緩衝區的當前位置
i = bytebuffer.position();
//獲取緩衝區limit位置
int j = bytebuffer.limit();
//斷言position是否大於limit,是丟擲AssertionError
if(!$assertionsDisabled && i > j)
throw new AssertionError();
int k = i > j ? 0 : j - i;//需要些的位元組數
//獲取k個位元組的臨時DirectBuffer
bytebuffer1 = Util.getTemporaryDirectBuffer(k);
int j1;
寫緩衝區到臨時記憶體緩衝區DirectBuffer-bytebuffer1
bytebuffer1.put(bytebuffer);
//轉換bytebuffer1寫模式,為讀模式
bytebuffer1.flip();
bytebuffer.position(i);//重新定位bytebuffer的position位置
//從本地緩衝空間寫位元組流,i1為已寫的位元組數
int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj);
if(i1 > 0)
//重新定位bytebuffer的position位置
//為什麼重新定位bytebuffer的position位,
//如果位元組序列太多,傳送緩衝區一次寫不完,需要分多次寫
//將position向前移動i1位置,避免重複寫即已寫過的位元組序列。
bytebuffer.position(i + i1);
j1 = i1;
//將byteBuffer記憶體寫到當前執行緒的快取區
Util.offerFirstTemporaryDirectBuffer(bytebuffer1);
return j1;
Exception exception;
exception;
Util.offerFirstTemporaryDirectBuffer(bytebuffer1);
throw exception;
}

這一步我們有幾點要關注:
a.
//獲取k個位元組的臨時DirectBuffer
bytebuffer1 = Util.getTemporaryDirectBuffer(k);

想要理解這點,先看一下Util的定義
//Util
class Util
{
private static final int TEMP_BUF_POOL_SIZE;//臨時緩衝區大小
private static ThreadLocal localSelector = new ThreadLocal();
private static ThreadLocal localSelectorWrapper = new ThreadLocal();
private static Unsafe unsafe = Unsafe.getUnsafe();
private static int pageSize = -1;
private static volatile Constructor directByteBufferConstructor = null;
private static volatile Constructor directByteBufferRConstructor = null;
private static volatile String bugLevel = null;
private static boolean loaded = false;
static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus();
static
{
//初始化臨時緩衝區大小,為IOUtil的IOV_MAX,及系統預設最大IO緩衝區大小
//static final int IOV_MAX = iovMax();
//static native int iovMax();
TEMP_BUF_POOL_SIZE = IOUtil.IOV_MAX;
}
//執行緒本地快取區
private static ThreadLocal bufferCache = new ThreadLocal() {

protected BufferCache initialValue()
{
return new BufferCache();
}

protected volatile Object initialValue()
{
return initialValue();
}

};
}

//IOUtil,變數IOV_MAX
static native int iovMax();
static final int IOV_MAX = iovMax();

再來看Util的緩衝區的定義BufferCache
//Util
 private static class BufferCache
{
//存放位元組序列的快取陣列,可以這麼理解buffers為
//當前緩衝區存放的位元組序列ByteBuffer
//buffers的size,即為當前緩衝區可以接受寫多少個位元組序列ByteBuffer
private ByteBuffer buffers[];
private int count;//當前緩衝區中,有資料的位元組序列ByteBuffer的個數,即buffers計數器
private int start;//緩衝區buffers的開始索引,即頭部
static final boolean $assertionsDisabled = !sun/nio/ch/Util.desiredAssertionStatus();
BufferCache()
{
//初始化緩衝區
buffers = new ByteBuffer[Util.TEMP_BUF_POOL_SIZE];
}
//向緩衝區的頭部新增一個位元組序列bytebuffer,即寫位元組序列到快取區
boolean offerFirst(ByteBuffer bytebuffer)
{
if(count >= Util.TEMP_BUF_POOL_SIZE)
{
//如果當前緩衝區已滿,則返回false,即當前不能寫位元組序列到快取區
return false;
} else
{
//獲取緩衝區byteBuffers的當前頭部索引start的前一個索引
start = ((start + Util.TEMP_BUF_POOL_SIZE) - 1) % Util.TEMP_BUF_POOL_SIZE;
//寫位元組序列到快取區的索引start對應的ByteBuffer
buffers[start] = bytebuffer;
count++;//緩衝區bytebuffer計數器+1
return true;//寫位元組序列到快取區成功
}
}
//這個與offerFirst恰好相反,寫位元組序列到緩衝區的尾部(索引start + count)
boolean offerLast(ByteBuffer bytebuffer)
{
if(count >= Util.TEMP_BUF_POOL_SIZE)
{
return false;
} else
{
int i = (start + count) % Util.TEMP_BUF_POOL_SIZE;
buffers[i] = bytebuffer;
count++;
return true;
}
}
//緩衝區buffers,索引向後移動
private int next(int i)
{
return (i + 1) % Util.TEMP_BUF_POOL_SIZE;
}
//注意這個i不是索引的意思,是需要寫的位元組序列的位元組個數,
//這個在IOUtil的write方法中呼叫,如下面兩行程式碼
//獲取k個位元組的臨時DirectBuffer
//bytebuffer1 = Util.getTemporaryDirectBuffer(k);
ByteBuffer get(int i)
{
//如果快取區當前可用的可用的ByteBuffer,返回null
if(count == 0)
return null;
ByteBuffer abytebuffer[] = buffers;
ByteBuffer bytebuffer = abytebuffer[start];
//如果當前緩衝區start索引對應的bytebuffer,不夠用,即容量不夠存放要寫的位元組序列
//則遍歷當前buffers,找到可以存放的bytebuffer
if(bytebuffer.capacity() < i)
{
bytebuffer = null;
int j = start;
do
{
if((j = next(j)) == start)
//只有一個bytebuffer,break
break;
ByteBuffer bytebuffer1 = abytebuffer[j];
if(bytebuffer1 == null)
//下一個bytebuffer為null,break
break;
if(bytebuffer1.capacity() < i)
//容量不夠用,continue
continue;
//找到可以存放i個位元組序列的bytebuffer
bytebuffer = bytebuffer1;
break;
} while(true);
if(bytebuffer == null)
return null;
abytebuffer[j] = abytebuffer[start];
}
//清空
abytebuffer[start] = null;
start = next(start);
count--;//緩衝區bytebuffer計數器-1
//呼叫rewind,為了從開始位置寫位元組流
bytebuffer.rewind();
bytebuffer.limit(i);//限制bytebuffer的可用空間limit
return bytebuffer;
}
//緩衝區是否為空
boolean isEmpty()
{
return count == 0;
}
//移除緩衝區頭部的bytebuffer
ByteBuffer removeFirst()
{
//如果斷言開啟, 緩衝區為空,丟擲斷言異常
if(!$assertionsDisabled && count <= 0)
{
throw new AssertionError();
} else
{
//有了上面幾個方法,下面應該很好理解,就不說了
ByteBuffer bytebuffer = buffers[start];
buffers[start] = null;
start = next(start);
count--;
return bytebuffer;
}
}
}

從上面可以看出BufferCache用一個ByteBuffer陣列buffers存放寫到緩衝區的位元組流序列,每次寫位元組流對應一個ByteBuffer,用count記錄當前緩衝區中的有資料或可用的ByteBuffer數量,start記錄當前緩衝區buffers的頭部;offerFirst方法向緩衝區的頭部新增一個位元組序列bytebuffer,即寫位元組序列到快取區;offerLast與offerFirst恰好相反,寫位元組序列到緩衝區的尾部(索引start + count);next方法為向後移動緩衝區buffers索引;get(int i)方法為從緩衝區獲取可以存放i個位元組序列的ByteBuffer,並rewind位元組緩衝區ByteBuffer,
限制孔勇空間為ByteBuffer。removeFirst為移除緩衝區頭部的bytebuffer,並返回。

看過Util的BufferCache的定義,我們再回到
//獲取k個位元組的臨時DirectBuffer
bytebuffer1 = Util.getTemporaryDirectBuffer(k);

//Util
static ByteBuffer getTemporaryDirectBuffer(int i)
{
//獲取當前執行緒的緩衝區(ThreadLocal-bufferCache)
BufferCache buffercache = (BufferCache)bufferCache.get();
//從緩衝區獲取容量第一個大於i的ByteBuffer
ByteBuffer bytebuffer = buffercache.get(i);
//如果緩衝區存在容量大於i個位元組的bytebuffer,直接返回
if(bytebuffer != null)
return bytebuffer;
//如果緩衝區中不存在容量大於i的bytebuffer,且不為空;
//則移除緩衝區頭部的bytebuffer
if(!buffercache.isEmpty())
{
ByteBuffer bytebuffer1 = buffercache.removeFirst();
//釋放bytebuffer1
free(bytebuffer1);
}
//ByteBuffer直接分配一個DirectByteBuffer,存放位元組序列
return ByteBuffer.allocateDirect(i);
}

獲取臨時DirectByteBuffer有兩點要看
a.1
//釋放bytebuffer1
free(bytebuffer1);

//Util
 private static void free(ByteBuffer bytebuffer)
{
//實際委託給DirectBuffer的clean,這個我們在DirectByteBuffer有說,
//即釋放分配的實際實體記憶體
((DirectBuffer)bytebuffer).cleaner().clean();
}

//DirectBuffer
package sun.nio.ch;
import sun.misc.Cleaner;
public interface DirectBuffer
{
public abstract long address();
public abstract Object attachment();
public abstract Cleaner cleaner();
}

a.2
public static ByteBuffer allocateDirect(int capacity) {
return new DirectByteBuffer(capacity);
}

b.
//從本地緩衝空間寫位元組流,i1為已寫的位元組數
int i1 = writeFromNativeBuffer(filedescriptor, bytebuffer1, l, nativedispatcher, obj);

//nativedispatcher引數實際為SocketDispatcher
private static int writeFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, long l, NativeDispatcher nativedispatcher, Object obj)
throws IOException
{
int i = bytebuffer.position();
int j = bytebuffer.limit();
if(!$assertionsDisabled && i > j)
throw new AssertionError();
int k = i > j ? 0 : j - i;
int i1 = 0;
if(k == 0)
return 0;
if(l != -1L)
//這個方法在Nativedispatcher定義,在SocketDispatcher並沒有實現,obj為writeLock
i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);
else
//預設的寫操作
i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);
if(i1 > 0)
//將position向前移動i1位置,避免重複寫即已寫過的位元組序列
bytebuffer.position(i + i1);
return i1;
}

來看兩種方式的寫
b.1
 if(l != -1L)
//這個在Nativedispatcher,在SocketDispatcher並沒有實現
i1 = nativedispatcher.pwrite(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, l, obj);

//Nativedispatcher
 int pwrite(FileDescriptor filedescriptor, long l, int i, long l1, Object obj)
throws IOException
{
//操作當前JDK,不支援,留待以後擴充套件用吧,我的JDK為1.7.0.17
throw new IOException("Operation Unsupported");
}

b.2
else
//預設的寫操作
i1 = nativedispatcher.write(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k);

//SocketDispatcher
int write(FileDescriptor filedescriptor, long l, int i)
throws IOException
{
return write0(filedescriptor, l, i);
}
static native int write0(FileDescriptor filedescriptor, long l, int i)
throws IOException;

從緩衝讀取位元組序列,寫到通道中,實際是通過SocketDispatcher完成實際的寫工作,當前預設的寫方法為write(FileDescriptor filedescriptor, long l, int i)。
c.
//新增bytebuffer到執行緒當前緩衝區
Util.offerFirstTemporaryDirectBuffer(bytebuffer1);

static void offerFirstTemporaryDirectBuffer(ByteBuffer bytebuffer)
{
if(!$assertionsDisabled && bytebuffer == null)
throw new AssertionError();
//獲取當前執行緒緩衝區
BufferCache buffercache = (BufferCache)bufferCache.get();
//將bytebuffer新增到緩衝區
if(!buffercache.offerFirst(bytebuffer))
free(bytebuffer);
}

3.
//清除寫執行緒
writerCleanup();
private void writerCleanup()
throws IOException
{
synchronized(stateLock)
{
writerThread = 0L;
if(state == 3)
//這個kill操作,我們會在後面再講
kill();
}
}

從以上分析可以看出,從緩衝區讀取位元組序列寫到通道,首先確保通道開啟,且輸出流沒有關閉,然後委託給IOUtil寫位元組序列;IOUtil寫位元組流過程為首先通過Util從當前執行緒的緩衝區獲取可以容下位元組序列的臨時緩衝區(DirectByteBuffer),如果沒有則建立一個DirectByteBuffer,將位元組序列寫到臨時的DirectByteBuffer中,然後將寫操作委託給nativedispatcher(SocketDispatcher),將DirectByteBuffer新增到當前執行緒的緩衝區,
以便重用,因為DirectByteBuffer實際上是存在實體記憶體中,頻繁的分配將會消耗更多的資源。
[size=medium][b]
總結:[/b][/size]
[color=blue]SocketChannelImpl構造主要是初始化讀寫及狀態鎖和通道socket檔案描述。
connect連線方法首先同步讀鎖和寫鎖,確保socket通道開啟,並沒有連線;然後檢查socket地址的正確性與合法性,然後檢查當前執行緒是否有Connect方法的訪問控制權限,最後嘗試連線socket地址。從緩衝區讀取位元組序列寫到通道write(ByteBuffer),首先確保通道開啟,且輸出流沒有關閉,然後委託給IOUtil寫位元組序列;IOUtil寫位元組流過程為首先通過Util從當前執行緒的緩衝區獲取可以容下位元組序列的臨時緩衝區(DirectByteBuffer),如果沒有則建立一個DirectByteBuffer,將位元組序列寫到臨時的DirectByteBuffer中,然後將寫操作委託給nativedispatcher(SocketDispatcher),將DirectByteBuffer新增到當前執行緒的緩衝區,
以便重用,因為DirectByteBuffer實際上是存在實體記憶體中,頻繁的分配將會消耗更多的資源。[/color]
SocketChannelImpl 解析二(傳送資料後續):[url]http://donald-draper.iteye.com/blog/2372548[/url]
附:
許可權檢查:SecurityManager為系統的預設安全檢查管理器,主要用於檢查當前執行緒是否擁有
某個許可權的訪問控制權限,比如socket連線,監聽,獲取類載入等。
//SecurityManager
//檢查socket連線許可權
public void checkConnect(String host, int port) {
if (host == null) {
throw new NullPointerException("host can't be null");
}
if (!host.startsWith("[") && host.indexOf(':') != -1) {
host = "[" + host + "]";
}
if (port == -1) {
checkPermission(new SocketPermission(host,
SecurityConstants.SOCKET_RESOLVE_ACTION));
} else {
//檢查是否socket連線訪問控制權限
checkPermission(new SocketPermission(host+":"+port,
SecurityConstants.SOCKET_CONNECT_ACTION));
}
}
public void checkPermission(Permission perm) {
//檢查是否perm的訪問控制權限
java.security.AccessController.checkPermission(perm);
}

//SecurityConstants,安全許可權常量
public final class SecurityConstants
{
//AWT為建立圖形介面相關許可權
public static class AWT
{
private static PermissionFactory permissionFactory()
{
Class class1;
class1 = (Class)AccessController.doPrivileged(new PrivilegedAction() {

public Class run()
{
return Class.forName("sun.awt.AWTPermissionFactory", true, null);
ClassNotFoundException classnotfoundexception;
classnotfoundexception;
return null;
}

public volatile Object run()
{
return run();
}

});
if(class1 == null)
break MISSING_BLOCK_LABEL_52;
return (PermissionFactory)class1.newInstance();
Object obj;
obj;
throw new InternalError(((InstantiationException) (obj)).getMessage());
obj;
throw new InternalError(((IllegalAccessException) (obj)).getMessage());
return new FakeAWTPermissionFactory();
}
private static Permission newAWTPermission(String s)
{
return factory.newPermission(s);
}
private static final String AWTFactory = "sun.awt.AWTPermissionFactory";
private static final PermissionFactory factory = permissionFactory();
public static final Permission TOPLEVEL_WINDOW_PERMISSION = newAWTPermission("showWindowWithoutWarningBanner");
public static final Permission ACCESS_CLIPBOARD_PERMISSION = newAWTPermission("accessClipboard");//訪問貼上板
public static final Permission CHECK_AWT_EVENTQUEUE_PERMISSION = newAWTPermission("accessEventQueue");
public static final Permission TOOLKIT_MODALITY_PERMISSION = newAWTPermission("toolkitModality");
public static final Permission READ_DISPLAY_PIXELS_PERMISSION = newAWTPermission("readDisplayPixels");
public static final Permission CREATE_ROBOT_PERMISSION = newAWTPermission("createRobot");
public static final Permission WATCH_MOUSE_PERMISSION = newAWTPermission("watchMousePointer");
public static final Permission SET_WINDOW_ALWAYS_ON_TOP_PERMISSION = newAWTPermission("setWindowAlwaysOnTop");
public static final Permission ALL_AWT_EVENTS_PERMISSION = newAWTPermission("listenToAllAWTEvents");
public static final Permission ACCESS_SYSTEM_TRAY_PERMISSION = newAWTPermission("accessSystemTray");


private AWT()
{
}
}
private static class FakeAWTPermission extends BasicPermission
{

public String toString()
{
return (new StringBuilder()).append("(\"java.awt.AWTPermission\" \"").append(getName()).append("\")").toString();
}

private static final long serialVersionUID = -1L;

public FakeAWTPermission(String s)
{
super(s);
}
}
private static class FakeAWTPermissionFactory
implements PermissionFactory
{

public FakeAWTPermission newPermission(String s)
{
return new FakeAWTPermission(s);
}

public volatile Permission newPermission(String s)
{
return newPermission(s);
}

private FakeAWTPermissionFactory()
{
}

}
private SecurityConstants()
{
}
public static final String FILE_DELETE_ACTION = "delete";//檔案刪除
public static final String FILE_EXECUTE_ACTION = "execute";//檔案執行
public static final String FILE_READ_ACTION = "read";//檔案讀
public static final String FILE_WRITE_ACTION = "write";//寫檔案
public static final String FILE_READLINK_ACTION = "readlink";
public static final String SOCKET_RESOLVE_ACTION = "resolve";
public static final String SOCKET_CONNECT_ACTION = "connect";//socket連線
public static final String SOCKET_LISTEN_ACTION = "listen";//socket監聽
public static final String SOCKET_ACCEPT_ACTION = "accept";//socket接受連線
public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";//socket連線,接受連線
public static final String PROPERTY_RW_ACTION = "read,write";//讀寫屬性
public static final String PROPERTY_READ_ACTION = "read";//讀屬性
public static final String PROPERTY_WRITE_ACTION = "write";//寫屬性
public static final AllPermission ALL_PERMISSION = new AllPermission();
public static final NetPermission SPECIFY_HANDLER_PERMISSION = new NetPermission("specifyStreamHandler");
public static final NetPermission SET_PROXYSELECTOR_PERMISSION = new NetPermission("setProxySelector");
public static final NetPermission GET_PROXYSELECTOR_PERMISSION = new NetPermission("getProxySelector");
public static final NetPermission SET_COOKIEHANDLER_PERMISSION = new NetPermission("setCookieHandler");
public static final NetPermission GET_COOKIEHANDLER_PERMISSION = new NetPermission("getCookieHandler");
public static final NetPermission SET_RESPONSECACHE_PERMISSION = new NetPermission("setResponseCache");
public static final NetPermission GET_RESPONSECACHE_PERMISSION = new NetPermission("getResponseCache");
//建立類載入器
public static final RuntimePermission CREATE_CLASSLOADER_PERMISSION = new RuntimePermission("createClassLoader");
public static final RuntimePermission CHECK_MEMBER_ACCESS_PERMISSION = new RuntimePermission("accessDeclaredMembers");
//修改執行緒
public static final RuntimePermission MODIFY_THREAD_PERMISSION = new RuntimePermission("modifyThread");
//修改執行緒分組資訊
public static final RuntimePermission MODIFY_THREADGROUP_PERMISSION = new RuntimePermission("modifyThreadGroup");
public static final RuntimePermission GET_PD_PERMISSION = new RuntimePermission("getProtectionDomain");
//獲取類載入器
public static final RuntimePermission GET_CLASSLOADER_PERMISSION = new RuntimePermission("getClassLoader");
public static final RuntimePermission STOP_THREAD_PERMISSION = new RuntimePermission("stopThread");
public static final RuntimePermission GET_STACK_TRACE_PERMISSION = new RuntimePermission("getStackTrace");
public static final SecurityPermission CREATE_ACC_PERMISSION = new SecurityPermission("createAccessControlContext");
public static final SecurityPermission GET_COMBINER_PERMISSION = new SecurityPermission("getDomainCombiner");
public static final SecurityPermission GET_POLICY_PERMISSION = new SecurityPermission("getPolicy");
public static final SocketPermission LOCAL_LISTEN_PERMISSION = new SocketPermission("localhost:1024-", "listen");

}