Channel詳解
Channel簡介
在Java NIO中,主要有三大基本的元件:Buffer、Channel和Selector,前面兩篇文章我們具體介紹了Selector和Buffer,老規矩,就讓我們繼續慢慢地揭開Channel的神祕面紗吧!
在Java NIO的世界中,Selector是 中央控制器
,Buffer是承載資料的 容器
,而Channel可以說是最基礎的 門面
,它是本地I/O裝置、網路I/O的 通訊橋樑
,只有搭建了這座橋樑,資料才能被寫入Buffer,連線才能被Selector控制,
Channel這座橋樑分別為本地I/O裝置和網路I/O提供了以下實現,並且和Java IO體系的類是一一對應的:
- 網路I/O裝置:
- DatagramChannel:讀寫UDP通訊的資料,對應DatagramSocket類
- SocketChannel:讀寫TCP通訊的資料,對應Socket類
- ServerSocketChannel:監聽新的TCP連線,並且會建立一個可讀寫的SocketChannel,對應ServerSocket類
- 本地I/O裝置:
- FileChannel:讀寫本地檔案的資料,不支援Selector控制,對應File類
其類繼承結構如下圖:

Channel類繼承結構圖
- 從上圖中我們可以看出前面講述的四個類都是被定義為抽象的,這些類中只是聲明瞭可操作的介面;主要是在不同的作業系統當中,其實際操作本地I/O和網路I/O在實現上會有根本性的差異,就拿Windows和Unix來說,兩者的檔案系統管理是不一致的(想了解三者I/O架構上的區別可參考 Unix,Linux,Windows的IO架構 )
- Channel介面實現了Closeable介面,並且本身還定義
isOpen()
方法,標識所有的Channel都是可以被主動關閉 -
InterruptibleChannel
介面聲明瞭Channel是可以被中斷的 -
SelectableChannel
介面聲明瞭Channel是可以被選擇的(即支援Selector控制),而FileChannel是沒有實現該介面的 -
WritableByteChannel
和ReadableByteChannel
介面分別提供了寫操作和讀操作的API,且是基於Buffer的 -
ScatteringByteChannel
和GatheringByteChannel
介面允許您委託作業系統來完成辛苦活:將讀取到的資料分開存放到多個儲存桶(bucket)或者將不同的資料區塊合併成一個整體。這是一個巨大的成就,因為作業系統已經被高度優化來完成此類工作了。它節省了您來回移動資料的工作,也就避免了緩衝區拷貝和減少了您需要編寫、除錯的程式碼數量。其分別定義了write(ByteBuffer[] srcs, int offset, int length)
和read(ByteBuffer[] dsts, int offset, int length)
-
SeekableByteChannel
介面用於控制本地檔案的position -
NetworkChannel
介面標識了該Channel是屬於網路I/O
ServerSocketChannel
讓我們從最簡單的 ServerSocketChannel
來開始對socket通道類的討論。以下是 ServerSocketChannel
的完整 API:
public abstract class ServerSocketChannel extends AbstractSelectableChannel { public static ServerSocketChannel open() throws IOException public abstract ServerSocket socket(); public abstract SocketChannel accept() throws IOException; //支援的SelectionKey型別,返回OP_ACCEPT public final int validOps() }
ServerSocketChannel
與 ServerSocket
一樣是socket監聽器,其主要區別前者可以執行在非阻塞模式下執行;
// 建立一個ServerSocketChannel,將會關聯一個未繫結的ServerSocket public static ServerSocketChannel open() throws IOException { return SelectorProvider.provider().openServerSocketChannel(); }
ServerSocketChannel
的建立也是依賴底層作業系統實現,其實現類主要是 ServerSocketChannelImpl
,我們來看看其構造方法
class ServerSocketChannelImpl extends ServerSocketChannel implements SelChImpl { private final FileDescriptor fd; private int fdVal; // 這裡忽略一些變數 ..... private int state = -1; ServerSocketChannelImpl(SelectorProvider var1) throws IOException { super(var1); // 建立一個檔案操作符 this.fd = Net.serverSocket(true); // 得到檔案操作符是索引 this.fdVal = IOUtil.fdVal(this.fd); this.state = 0; }
新建一個 ServerSocketChannelImpl
其本質是在底層作業系統建立了一個fd(即 檔案描述符 ),相當於建立了一個用於網路通訊的通道,通過這個通道我們可以和外部網路進行通訊;
當然上述操作只是搶佔了一個通道,它是無法和外部通訊的;我們知道,在實際網路互動中,必須通過端口才能通訊,所以呢,下一步我們來看看如何繫結埠
ServerSocketChannel
貌似沒有 bind()
方法來繫結埠,上面我們提到它在建立時會新建一個fd,其本質對應了 ServetSocket
物件,我們看 ServerSocketChannel
的API能看到通過 socket()
物件能獲取到 ServetSocket
,此時我們只要呼叫socket的 bind()
方法繫結即可
ServerSocketChannel#socket#bind(InetSocketAddress)
ServerSocketChannel
最主要的作用就是用於監聽TCP連線,其API中也有相應的 accept()
方法來獲取TCP連線
public SocketChannel accept() throws IOException { // 忽略一些校驗及無關程式碼 .... SocketChannelImpl var2 = null; // var3的作用主要是說明當前的IO狀態,主要有 /** * EOF = -1; * UNAVAILABLE = -2; * INTERRUPTED = -3; * UNSUPPORTED = -4; * THROWN = -5; * UNSUPPORTED_CASE = -6; */ int var3 = 0; // 這裡本質也是用fd來獲取連線 FileDescriptor var4 = new FileDescriptor(); // 用來儲存TCP連線的地址資訊 InetSocketAddress[] var5 = new InetSocketAddress[1]; try { // 這裡設定了一箇中斷器,中斷時會將連線關閉 this.begin(); // 這裡當IO被中斷時,會重新獲取連線 do { var3 = this.accept(this.fd, var4, var5); } while(var3 == -3 && this.isOpen()); }finally { // 當連線被關閉且accept失敗時或丟擲AsynchronousCloseException this.end(var3 > 0); // 驗證連線是可用的 assert IOStatus.check(var3); } if (var3 < 1) { return null; } { // 預設連線是阻塞的 IOUtil.configureBlocking(var4, true); // 建立一個SocketChannel的引用 var2 = new SocketChannelImpl(this.provider(), var4, var5[0]); // 下面是是否連線成功校驗,這裡忽略... return var2; } } // 依賴底層作業系統實現的accept0方法 private int accept(FileDescriptor var1, FileDescriptor var2, InetSocketAddress[] var3) throws IOException { return this.accept0(var1, var2, var3); }
SocketChannel
使用 ServerSocketChannel
可以實時獲取到新建的TCP連線,從上面 accpet()
方法得出,其返回的是一個 SocketChannelImpl
物件,其繼承的類的是 SocketChannel
,以下是 SocketChannel
的API:
public abstract class SocketChannel extends AbstractSelectableChannel implements ByteChannel, ScatteringByteChannel, GatheringByteChannel { // 這裡僅列出部分API public static SocketChannel open() throws IOException public static SocketChannel open(InetSocketAddress remote) throws IOException public abstract Socket socket(); public abstract boolean connect (SocketAddress remote) throws IOException; // 當前的連線channel是否有併發連線,非阻塞狀態下才有可能返回true public abstract boolean isConnectionPending(); //呼叫finishConnect()方法來完成連線過程,該方法任何時候都可以安全地進行呼叫。假如在一個非阻塞模式的SocketChannel物件上呼叫finishConnect()方法,將可能出現下列情形之一: /** * 1.connect()方法尚未被呼叫。那麼將產生NoConnectionPendingException異常。 * 2.連線建立過程正在進行,尚未完成。那麼什麼都不會發生,finishConnect()方法會立即返回false值。 * 3.在非阻塞模式下呼叫connect()方法之後,SocketChannel又被切換回了阻塞模式。那麼如果有必要的話,呼叫執行緒會阻塞直到連線建立完成,finishConnect()方法接著就會返回true值。 * 4.在初次呼叫connect()或最後一次呼叫finishConnect()之後,連線建立過程已經完成。那麼SocketChannel物件的內部狀態將被更新到已連線狀態,finishConnect()方法會返回true值,然後SocketChannel物件就可以被用來傳輸資料了。 * 5.連線已經建立。那麼什麼都不會發生,finishConnect()方法會返回true值。 */ public abstract boolean finishConnect() throws IOException; // 是否連線成功 public abstract boolean isConnected(); // 支援的SelectionKey型別,返回OP_CONNECT,OP_READ,OP_WRITE public final int validOps(); public abstract int read(ByteBuffer dst) throws IOException; public abstract int write(ByteBuffer src) throws IOException; }
上文我們提到 SocketChannel
是用於讀寫TCP通訊的資料,與 Socket
類一致,其封裝的是點對點、有序的網路連線;一個 SocketChannel
的建立必然伴隨著會建立一個同等的 Socket
物件(實際是 SocketAdaptor
),通過 socket
方法能獲取;
從API的方法名我們不難看出,其主要作用是
- 通過
open
方法建立SocketChannel
, - 然後利用
connect
方法來和服務端發起建立連線,還支援了一些判斷連線建立情況的方法; -
read
和write
支援最基本的讀寫操作
下面跟隨著上述4點讓我們來探究下其底層是怎麼工作。
(1)open建立過程
public static SocketChannel open() throws IOException { return SelectorProvider.provider().openSocketChannel(); }
與 ServerSocketChannel
一樣, SocketChannel
的建立也是依賴底層作業系統實現,其實現類主要是 SocketChannelImpl
,建立過程比較簡單,例項化了一個fd,並將當前Channel的狀態置為了 未連線
class SocketChannelImpl extends SocketChannel implements SelChImpl { // Our file descriptor object private final FileDescriptor fd; // fd value needed for dev/poll. This value will remain valid // even after the value in the file descriptor object has been set to -1 private final int fdVal; // Lock held by current reading or connecting thread private final Object readLock = new Object(); // Lock held by current writing or connecting thread private final Object writeLock = new Object(); // Lock held by any thread that modifies the state fields declared below // DO NOT invoke a blocking I/O operation while holding this lock! private final Object stateLock = new Object(); // State, increases monotonically private static final int ST_UNINITIALIZED = -1; private static final int ST_UNCONNECTED = 0; private static final int ST_PENDING = 1; private static final int ST_CONNECTED = 2; private static final int ST_KILLPENDING = 3; private static final int ST_KILLED = 4; private int state = ST_UNINITIALIZED; SocketChannelImpl(SelectorProvider sp) throws IOException { super(sp); // 建立一個scoket通道,即fd(fd的作用可參考上面的描述) this.fd = Net.socket(true); // 得到該fd的索引 this.fdVal = IOUtil.fdVal(fd); this.state = ST_UNCONNECTED; } }
(2)connect建立連線
// Channel的連線過程,這隻附了關鍵部分程式碼 public boolean connect(SocketAddress sa) throws IOException { // 讀寫都鎖住 lock(readLock&writeLock) { /****狀態檢查,channel和address****/ // 判斷channel是否 ensureOpenAndUnconnected(); InetSocketAddress isa = Net.checkAddress(sa); /****連線建立****/ // 阻塞狀態變更的鎖也鎖住 lock(blockingLock) { lock(stateLock) { // 如果當前socket未繫結本地埠,則嘗試著判斷和服務端是否能建立連線 if (localAddress == null) { // 和遠端建立連線後關閉連線,待會我們詳細說一下這個方法 NetHooks.beforeTcpConnect(fd,isa.getAddress(),isa.getPort()); } } for(;;) { // 建立連線 n = Net.connect(fd , ia , isa.getPort()); // 中斷會重新嘗試 if((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } break; } } /****狀態變更****/ lock(stateLock) { if(n > 0) { state = ST_CONNECTED; return; } // 如果連線尚未建立成功,且當前channel是非阻塞的,狀態置為pending,此時不允許其他呼叫,呼叫時會拋ConnectionPendingException if (!isBlocking()) state = ST_PENDING; } } } // 這裡補充介紹一下NetHooks.beforeTcpConnect方法,這個方法在其他地方也可能遇到 /**********其呼叫鏈路如下**********/ // 1. NetHooks#beforeTcpConnect // 直接被代理跳轉到SdpProvider provider.implBeforeTcpConnect(fdObj, address, port); // 2. SdpProvider#implBeforeTcpConnect // 主要是先通過規則校驗器判斷入參是否符合,一般有PortRangeRule校驗器 // 然後再執行將fd轉換為socket for (Rule rule: rules) { if (rule.match(action, address, port)) { SdpSupport.convertSocket(fdObj); matched = true; break; } } // 3. SdpSupport#convertSocket // 獲取fd的索引 int fdVal = fdAccess.get(fd); // 跳轉到native方法 convert0(fdVal); // 4. SdpSupport.c#convert0 JNIEXPORT void JNICALL Java_sun_net_sdp_SdpSupport_convert0(JNIEnv *env, jclass cls, int fd) { // create方法實際是通過socket(AF_INET_SDP, SOCK_STREAM, 0);方法得到一個socket int s = create(env); if (s >= 0) { socklen_t len; int arg, res; struct linger linger; /* copy socket options that are relevant to SDP */ len = sizeof(arg); // 重用TIME_WAIT的埠 if (getsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char*)&arg, len); len = sizeof(arg); // 緊急資料放入普通資料流 if (getsockopt(fd, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, &len) == 0) setsockopt(s, SOL_SOCKET, SO_OOBINLINE, (char*)&arg, len); len = sizeof(linger); // 延遲關閉連線 if (getsockopt(fd, SOL_SOCKET, SO_LINGER, (void*)&linger, &len) == 0) setsockopt(s, SOL_SOCKET, SO_LINGER, (char*)&linger, len); // 將fd也引用到s所持有的通道 RESTARTABLE(dup2(s, fd), res); if (res < 0) JNU_ThrowIOExceptionWithLastError(env, "dup2"); // 執行close方法,關閉s這個引用 RESTARTABLE(close(s), res); } }
(3)read
ok,現在Channel已經通過connect方法與伺服器連線好了連線,下面我們開始試著讀寫channel吧
讓我們回顧一下,在[Select詳解]文章中,我們寫了個簡單的Selector的例子,其中就包括了讀和寫操作,我們先看看讀操作:
..... while ((ret = socketChannel.read(buf)) > 0){ readBytes += ret; } .....
從具體 socketChannel
中讀取內容至buf,返回的是當前IO的狀態,讓我們來探究下原始碼
1.SocketChannelImpl#read(ByteBuffer buf) { lock(readLock) { // 如果buf=null,丟擲NullPointerException .... // 這裡有個判斷,當channel被關閉時直接返回0 ..... // 核心讀邏輯 for (;;) { // 通過IOUtil的讀取fd的資料至buf // 這裡的nd是SocketDispatcher,用於呼叫底層的read和write操作 n = IOUtil.read(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } // 這個方法主要是將UNAVAILABLE(原為-2)這個狀態返回0,否則返回n return IOStatus.normalize(n); } } } 2.IOUtil.read(fd , buf , position , nd) { // 如果buf是隻可讀,則丟擲異常 throw IllegalArgumentException("Read-only buffer"); if (dst instanceof DirectBuffer) return readIntoNativeBuffer(fd, buf, position, nd); // 臨時緩衝區,大小為buf的remain(limit - position),堆外記憶體,使用ByteBuffer.allocateDirect(size)分配 // Notes:這裡分配後後面有個try-finally塊會釋放該部分記憶體 ByteBuffer bb = Util.getTemporaryDirectBuffer(buf.remaining()); // 將網路中的buf讀進direct buffer int n = readIntoNativeBuffer(fd, bb, position, nd); // 待讀取 bb.flip(); if (n > 0) // 成功時寫入 buf.put(bb); return n; } 3.IOUtil.readIntoNativeBuffer(fd , buf , position , nd) { // ... 忽略一些獲取buf變數的程式碼 if (position != -1) { // pread方法只有在同步狀態下才能使用 n = nd.pread(fd ,((DirectBuffer)bb).address() + pos,rem, position); } else { // 其呼叫SocketDispatcher.read方法 -> FileDispatcherImpl.read0方法 n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem); } } 4.FileDispatcherImpl.read0 { // 獲取fd索引 jint fd = fdval(env, fdo); void *buf = (void *)jlong_to_ptr(address); // 呼叫底層read方法 return convertReturnVal(env, read(fd, buf, len), JNI_TRUE); }
總結一下讀取的過程
- 初始化一個direct buffer,如果本身的buffer就是direct的則不用初始化
- 呼叫底層read方法寫入至direct buffer
- 最終將direct buffer寫到傳入的buffer物件
(4)write
ok,看完了讀的過程,我們在看看寫的過程;還是之前例子中的寫入程式碼,如下
if (buf.hasRemaining()) { socketChannel.write(buf); }
繼續探究下原始碼
1.SocketChannelImpl#write(ByteBuffer buf) { lock(writeLock) { //... for (;;) { // 通過IOUtil的讀取fd的資料至buf // 這裡的nd是SocketDispatcher,用於呼叫底層的read和write操作 n = IOUtil.write(fd, buf, -1, nd); if ((n == IOStatus.INTERRUPTED) && isOpen()) { continue; } // 這個方法主要是將UNAVAILABLE(原為-2)這個狀態返回0,否則返回n return IOStatus.normalize(n); } } } 2.IOUtil.write(fd, buf, position, nd) { // .... if (src instanceof DirectBuffer) return writeFromNativeBuffer(fd, buf, position, nd); ByteBuffer bb = Util.getTemporaryDirectBuffer(rem); bb.put(buf); bb.flip(); // 這裡的pos為buf初始的position,意思是將buf重置為最初的狀態;因為目前還沒有真實的寫入到channel中 buf.position(pos); // 呼叫 int n = writeFromNativeBuffer(fd, bb, position, nd); if (n > 0) { buf.position(pos + n); } } 3.IOUtil.writeFromNativeBuffer(fd , buf , position , nd) { // ... 忽略一些獲取buf變數的程式碼 int written = 0; if (position != -1) { // pread方法只有在同步狀態下才能使用 written = nd.pwrite(fd ,((DirectBuffer)bb).address() + pos,rem, position); } else { // 其呼叫SocketDispatcher.write方法 -> FileDispatcherImpl.write0方法 written = nd.write(fd, ((DirectBuffer)bb).address() + pos, rem); } //.... } 4.FileDispatcherImpl.write0 { // 呼叫底層的write方法寫入 return convertReturnVal(env, write(fd, buf, len), JNI_FALSE); }
總結一下write的過程:
並恢復buf的position
DatagramChannel
DatagramChannel是NIO中面向Datagram(資料報)的套接字通道.
一般我們在實際程式設計中用到這個Channel的情況很少,所以我在這裡就不詳細說明了,有興趣的同學可以通過 Java NIO深入理解DatagramChannel 這篇文章瞭解
FileChannel
FileChannel是執行緒安全的,只能通過FileInputStream,FileOutputStream,RandomAccessFile的getChannel方法獲取FileChannel通道,原理是獲取到底層作業系統生成的fd(file descriptor)
FileChannel主要是對本地檔案操作的NIO中的一套新的機制,後面我們再配合IO部分的內容來仔細研究它
總結
這篇文章主要是介紹了Channel通道類在NIO程式設計中的作用,並主要講述了 ServerSocketChannel
和 SocketChannel
這兩個Channel的底層工作機制,總結一下上面的關鍵點
-
ServerSocketChannel
只支援accept事件,SocketChannel
只支援connect、read、write事件 -
SocketChannel
的讀取和寫入都需要依賴direct buffer來做中間轉換 -
SocketChannel
在connect之前會呼叫NetHooks#beforeTcpConnect
FYI
微信搜尋公眾號"一隻懶懶的coder"可關注我獲取最新動態哦!!也可掃描下方的二維碼哦!!!

一隻懶懶的coder