1. 程式人生 > >Socket之-非阻塞通訊

Socket之-非阻塞通訊

本篇文章觀點和例子來自 《Java網路程式設計精解》, 作者為孫衛琴, 出版社為電子工業出版社。

      對於用ServerSocket 及 Socket 編寫的伺服器程式和客戶程式, 他們在執行過程中常常會阻塞. 例如, 當一個執行緒執行 ServerSocket 的accept() 方法時, 假如沒有客戶連線, 該執行緒就會一直等到有客戶連線才從 accept() 方法返回. 再例如, 當執行緒執行 Socket 的 read() 方法時, 如果輸入流中沒有資料, 該執行緒就會一直等到讀入足夠的資料才從 read() 方法返回.

      假如伺服器程式需要同時與多個客戶通訊, 就必須分配多個工作執行緒, 讓他們分別負責與一個客戶通訊, 當然每個工作執行緒都有可能經常處於長時間的阻塞狀態.

      從 JDK1.4 版本開始, 引入了非阻塞的通訊機制. 伺服器程式接收客戶連線, 客戶程式建立與伺服器的連線, 以及伺服器程式和客戶程式收發資料的操作都可以按非阻塞的方式進行. 伺服器程式只需要建立一個執行緒, 就能完成同時與多個客戶通訊的任務.

      非阻塞的通訊機制主要由 java.nio 包(新I/O包) 中的類實現, 主要的類包括 ServerSocketChannel, SocketChannel, Selector, SelectionKey 和 ByteBuffer 等.

      本章介紹如何用 java.nio 包中的類來建立伺服器程式和客戶程式, 並且 分別採用阻塞模式和非阻塞模式來實現它們. 通過比較不同的實現方式, 可以幫助讀者理解它們的區別和適用範圍.

一. 執行緒阻塞的概念

      在生活中, 最常見的阻塞現象是公路上汽車的堵塞. 汽車在公路上快速行駛, 如果前方交通受阻, 就只好停下來等待, 等到交通暢順, 才能恢復行駛.

      執行緒在執行中也會因為某些原因而阻塞. 所有處於阻塞狀態的執行緒的共同特徵是: 放棄CPU, 暫停執行, 只有等到導致阻塞的原因消除, 才能恢復執行; 或者被其他執行緒中斷, 該執行緒會退出阻塞狀態, 並且丟擲 InterruptedException.

1.1 執行緒阻塞的原因

      導致執行緒阻塞的原因主要有以下幾方面.

  • 執行緒執行了 Thread.sleep(int n) 方法, 執行緒放棄 CPU, 睡眠 n 毫秒, 然後恢復執行.
  • 執行緒要執行一段同步程式碼, 由於無法獲得相關的同步鎖, 只好進入阻塞狀態, 等到獲得了同步鎖, 才能恢復執行.
  • 執行緒執行了一個物件的 wait() 方法, 進入阻塞狀態, 只有等到其他執行緒執行了該物件的 notify() 和 notifyAll() 方法, 才可能將其呼醒.
  • 執行緒執行 I/O 操作或進行遠端通訊時, 會因為等待相關的資源而進入阻塞狀態. 例如, 當執行緒執行 System.in.read() 方法時, 如果使用者沒有向控制檯輸入資料, 則該執行緒會一直等讀到了使用者的輸入資料才從 read() 方法返回.

     進行遠端通訊時, 在客戶程式中, 執行緒在以下情況可能進入阻塞狀態.

  • 請求與伺服器建立連線時, 即當執行緒執行 Socket 的帶引數構造方法, 或執行 Socket 的 connect() 方法時, 會進入阻塞狀態, 直到連線成功, 此執行緒才從 Socket 的構造方法或 connect() 方法返回.
  • 執行緒從 Socket 的輸入流讀入資料時, 如果沒有足夠的資料, 就會進入阻塞狀態, 直到讀到了足夠的資料, 或者到達輸入流的末尾, 或者出現了異常, 才從輸入流的 read() 方法返回或異常中斷. 輸入流中有多少資料才算足夠呢? 這要看執行緒執行的 read() 方法的型別.

> int read(): 只要輸入流中有一個位元組, 就算足夠.

> int read( byte[] buff): 只要輸入流中的位元組數目與引數buff 陣列的長度相同, 就算足夠.

> String readLine(): 只要輸入流中有一行字串, 就算足夠. 值得注意的是, InputStream 類並沒有 readLine() 方法, 在過濾流 BufferedReader 類中才有此方法. 

  • 執行緒向 Socket 的輸出流寫一批資料時, 可能會進入阻塞狀態, 等到輸出了所有的資料, 或者出現異常, 才從輸出流 的 write() 方法返回或異常中斷.
  • 呼叫 SOcket 的setSoLinger() 方法設定了關閉 Socket 的延遲時間, 那麼當執行緒執行 Socket 的 close() 方法時, 會進入阻塞狀態, 直到底層 Socket 傳送完所有剩餘資料, 或者超過了 setSoLinger() 方法設定的延遲時間, 才從 close() 方法返回.

      在伺服器程式中, 執行緒在以下情況下可能會進入阻塞狀態.

  • 執行緒執行 ServerSocket 的 accept() 方法, 等待客戶的連線, 直到接收到了客戶連線, 才從 accept() 方法返回.      
  • 執行緒從 Socket 的輸入流讀入資料時, 如果輸入流沒有足夠的資料, 就會進入阻塞狀態.
  • 執行緒向 Socket 的輸出流寫一批資料時, 可能會進入阻塞狀態, 等到輸出了所有的資料, 或者出現異常, 才從輸出流的 write() 方法返回或異常中斷.

      由此可見, 無論在伺服器程式還是客戶程式中, 當通過 Socket 的輸入流和輸出流來讀寫資料時, 都可能進入阻塞狀態. 這種可能出現阻塞的輸入和輸出操作被稱為阻塞 I/O. 與此對照, 如果執行輸入和輸出操作時, 不會發生阻塞, 則稱為非阻塞 I/O.

1.2 伺服器程式用多執行緒處理阻塞通訊的侷限

      本書第三章的第六節(建立多執行緒的伺服器) 已經介紹了伺服器程式用多執行緒來同時處理多個客戶連線的方式. 伺服器程式的處理流程如圖 4-1 所示. 主執行緒負責接收客戶的連線. 線上程池中有若干工作執行緒, 他們負責處理具體的客戶連線. 每當主執行緒接收到一個客戶連線, 就會把與這個客戶互動的任務交給一個空閒的工作執行緒去完成, 主執行緒繼續負責接收下一個客戶連線.

 伺服器程式用多執行緒處理阻塞通訊

                              圖4-1 伺服器程式用多執行緒處理阻塞通訊

      在圖4-1 總, 用粗體框標識的步驟為可能引起阻塞的步驟. 從圖中可以看出, 當主執行緒接收客戶連線, 以及工作執行緒執行 I/O 操作時, 都有可能進入阻塞狀態.

      伺服器程式用多執行緒來處理阻塞 I/O, 儘管能滿足同時響應多個客戶請求的需求, 但是有以下侷限:

      ⑴ Java 虛擬機器會為每個執行緒分配獨立的堆疊空間, 工作執行緒數目越多, 系統開銷就越大, 而且增加了 Java虛擬機器排程執行緒的負擔, 增加了執行緒之間同步的複雜性, 提高了執行緒死鎖的可能性;

      ⑵ 工作執行緒的許多時間都浪費在阻塞 I/O 操作上, Java 虛擬機器需要頻繁地轉讓 CPU 的使用權, 使進入阻塞狀態的執行緒放棄CPU, 再把CPU 分配給處於可執行狀態的執行緒.

      由此可見, 工作執行緒並不是越多越好. 如圖 4-2 所示, 保持適量的工作執行緒, 會提高伺服器的併發效能, 但是當工作執行緒的數目達到某個極限, 超出了系統的負荷時, 反而會減低併發效能, 使得多數客戶無法快速得到伺服器的響應.

 執行緒數目與併發效能的關係

                      圖4-2 執行緒數目與併發效能的更新                  

1.3 非阻塞通訊的基本思想

      假如要同時做兩件事: 燒開水和燒粥. 燒開水的步驟如下:

      鍋裡放水, 開啟煤氣爐;

      等待水燒開;                                                            //阻塞

      關閉煤氣爐, 把開水灌到水壺裡;

      燒粥的步驟如下:

      鍋裡放水和米, 開啟煤氣爐;

      等待粥燒開;                                                             //阻塞

      調整煤氣爐, 改為小火;   

      等待粥燒熟;                                                             //阻塞

      關閉煤氣爐;

      為了同時完成兩件事, 一個方案是同時請兩個人分別做其中的一件事, 這相當於採用多執行緒來同時完成多個任務. 還有一種方案是讓一個人同時完成兩件事, 這個人應該善於利用一件事的空閒時間去做另一件事, 一刻也不應該閒著:

      鍋子裡放水, 開啟煤氣爐;                      //開始燒水

      鍋子力放水和米, 開啟煤氣爐;                //開始燒粥

      while(一直等待, 直到有水燒開, 粥燒開或粥燒熟事件發生){          //阻塞

            if(水燒開)

                   關閉煤氣爐, 把開水灌到水壺裡;

            if(粥燒開)

                   調整煤氣爐, 改為小火;

            if(粥燒熟)

                   關閉煤氣爐;

            if(水已經燒開並且粥已經燒熟)

                   退出迴圈; 

      }         //這裡的煤氣爐我可以理解為每件事就有一個煤氣爐配給吧, 這也是一部分的開銷呢

                 //並且if裡面的動作必須要能快速完成的才行, 不然後面的就要排隊了

                 //如是太累的工作還是不要用這個好                                   

      這個人不斷監控燒水及燒粥的狀態, 如果發生了 “水燒開”, “粥燒開” 或 “粥燒熟” 事件, 就去處理這些事件, 處理完一件事後進行監控燒水及燒粥的狀態, 直到所有的任務都完成.

       以上工作方式也可以運用到伺服器程式中, 伺服器程式只需要一個執行緒就能同時負責接收客戶的連線, 接收各個客戶傳送的資料, 以及向各個客戶傳送響應資料. 伺服器程式的處理流程如下:

       while(一直等待, 直到有接收連線就緒事件, 讀就緒事件或寫就緒事件發生){             //阻塞

              if(有客戶連線)

                   接收客戶的連線;                                                    //非阻塞

              if(某個 Socket 的輸入流中有可讀資料)

                   從輸入流中讀資料;                                                 //非阻塞

              if(某個 Socket 的輸出流可以寫資料)

                   向輸出流寫資料;                                                    //非阻塞

       }

      以上處理流程採用了輪詢的工作方式, 當某一種操作就緒時, 就執行該操作, 否則就檢視是否還有其他就緒的操作可以執行. 執行緒不會因為某一個操作還沒有就緒, 就進入阻塞狀態, 一直傻傻地在那裡等待這個操作就緒.

      為了使輪詢的工作方式順利進行, 接收客戶的連線, 從輸入流讀資料, 以及向輸出流寫資料的操作都應該以非阻塞的方式執行. 所謂非阻塞, 就是指當執行緒執行這些方法時, 如果操作還沒有就緒, 就立即返回, 而不會一直等到操作就緒. 例如, 當執行緒接收客戶連線時, 如果沒有客戶連線, 就立即返回; 再例如, 當執行緒從輸入流中讀資料時, 如果輸入流中還沒有資料, 就立即返回, 或者如果輸入流還沒有足夠的資料, 那麼就讀取現有的資料, 然後返回. 值得注意的是, 以上 while 學校條件中的操作還是按照阻塞方式進行的, 如果未發生任何事件, 就會進入阻塞狀態, 直到接收連線就緒事件, 讀就緒事件或寫就緒事件中至少有一個事件發生時, 才會執行 while 迴圈體中的操作. 在while 迴圈體中, 一般會包含在特定條件下退出迴圈的操作.

二. java.nio 包中的主要類

        java.nio 包提供了支援非阻塞通訊的類.

  • ServerSocketChannel: ServerSocket 的替代類, 支援阻塞通訊與非阻塞通訊.
  • SocketChannel: Socket 的替代類, 支援阻塞通訊與非阻塞通訊.
  • Selector: 為ServerSocketChannel 監控接收連線就緒事件, 為 SocketChannel 監控連線就緒, 讀就緒和寫就緒事件.
  • SelectionKey: 代表 ServerSocketChannel 及 SocketChannel 向 Selector 註冊事件的控制代碼. 當一個 SelectionKey 物件位於Selector 物件的 selected-keys 集合中時, 就表示與這個 SelectionKey 物件相關的事件發生了.      

      ServerSocketChannel 及 SocketChannel 都是 SelectableChannel 的子類, 如圖 4-3 所示. SelectableChannel 類及其子類都能委託 Selector 來監控他們可能發生的一些事件, 這種委託過程也稱為註冊事件過程.

                        SelectableChannel 類及其子類的類框圖      

                                           圖4-3 SelectableChannel 類及其子類的類框圖

      ServerSocketChannel 向 Selector 註冊接收連線就緒事件的程式碼如下:

            SelectionKey key = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);                         

      SelectionKey 類的一些靜態常量表示事件型別, ServerSocketChannel 只可能發生一種事件.

  • SelectionKey.OP_ACCEPT: 接收連線就緒事件, 表示至少有了一個客戶連線, 伺服器可以接收這個連線.

     SocketChannel 可能發生以下 3 種事件.

  • SelectionKey.OP_CONNECT: 連線就緒事件, 表示客戶與伺服器的連線已經建立成功.
  • SelectionKey.OP_READ: 讀就緒事件,  表示輸入流中已經有了可讀資料, 可以執行讀操作了
  • SelectionKey.OP_WRITE: 寫就緒事件, 表示已經可以向輸入流寫資料了.

     SocketChannel 提供了接收和傳送資料的方法.

  • read(ByteBuffer buffer): 接收資料, 把它們存放到引數指定的 ByteBuffer 中.
  • write(ByteBuffer buffer): 把引數指定的 ByteBuffer 中的資料傳送出去.

     ByteBuffer 表示位元組緩衝區, SocketChannel 的 read() 和 write() 方法都會操縱 ByteBuffer. ByteBuffer 類繼承於 Buffer 類. ByteBuffer 中存放的是位元組,  為了把它們轉換為字串, 還需要用到 Charset 類, Charset 類代表字元編碼, 它提供了把位元組流轉換為字串(解碼過程) 和把字串轉換為位元組流(編碼過程) 的實用方法.

       下面幾小節分別介紹 Buffer, Charset, Channel, SelectableChannel, ServerSocketChannel, SocketChannel, Selector 和 SelectionKey 的用法.

2.1 緩衝區 Buffer

      資料輸入和輸出往往是比較耗時的操作. 緩衝區從兩個方面提高 I/O 操作的效率:

  • 減少實際的物理讀寫次數;
  • 快取區在建立時被分配記憶體, 這塊記憶體區域一直被重用, 這可以減少動態分配和回收記憶體區域的次數.

      舊I/O 類庫(對應 java.nio包) 中的 BufferedInputStream, BufferedOutputStream, BufferedReader 和 BufferedWriter 在其實現中都運用了緩衝區. java.nio 包公開了 Buffer API, 使得Java 程式可以直接控制和運用緩衝區. 如圖 4-4 所示, 顯示了 Buffer 類的層次結構.

                 Buffer 類的層次結構

                                         圖 4-4 Buffer 類的層次結構

     所有的緩衝區都有以下屬性:

  • 容量(capacity): 表示該緩衝區可以儲存多少資料.
  • 極限(limit): 表示緩衝區的當前終點, 不能對緩衝區中超過極限的區域進行讀寫操作. 極限是可以修改的, 這有利於緩衝區的重用. 例如, 假定容量100 的緩衝區已經填滿了資料, 接著程式在重用緩衝區時, 僅僅將 10 個新的資料寫入緩衝區中從位置0 到10 的區域, 這時可以將極限設為 10, 這樣就不能讀取先前的資料了. 極限是一個非負整數, 不應該大於容量.
  • 位置(position): 表示緩衝區中下一個讀寫單元的位置, 每次讀寫緩衝區的資料時, 都會改變該值, 為下一次讀寫資料作準備. 位置是一個非負整數, 不應該大於極限.

     如圖 4-5 所示, 以上 3 個屬性的關係為: 容量 ≥ 極限 ≥ 位置 ≥ 0

                   緩衝區的3個屬性

                    圖 4-5 緩衝區的 3 個屬性

     緩衝區提供了用於改變以上 3 個屬性的方法.

  • clear(): 把極限設為容量, 再把位置設為 0;
  • flip(): 把極限設為位置, 再把位置設為 0;
  • rewind(): 不改變極限, 把位置設為 0.

      Buffer 類的remaining() 方法返回緩衝區的剩餘容量, 取值等於極限-位置. Buffer 類的 compact() 方法刪除緩衝區內從 0 到當前位置position 的內容, 然後把從當前位置position 到極限limit 的內容複製到 0 到 limit-position 的區域內, 當前位置position 和極限limit 的取值也作相應的變化, 如圖 4-6 所示.

                     Buffer 類的 compact() 的作用

                                  圖4-6 Buffer 類的 compact() 的作用

         java.nio.Buffer 類是一個抽象類, 不能被例項化. 共有 8 個具體的緩衝區類, 其中最基本的緩衝區是 ByteBuffer, 它存放的資料單元是位元組. ByteBuffer 類並沒有提供公開的構造方法, 但是提供了兩個獲得 ByteBuffer 例項的靜態工廠方法.

  • allocate(int capacity): 返回一個 ByteBuffer 物件, 引數capacity 指定緩衝區的容量.
  • directAllocate(int capacity):返回一個 ByteBuffer 物件, 引數capacity 指定緩衝區的容量. 該方法返回的緩衝區稱為直接緩衝區, 它與當前作業系統能夠更好地耦合, 因此能進一步提高 I/O 操作的速度. 但是直接分配緩衝區的系統開銷很大, 因此只有在緩衝區較大並且長期存在, 或者需要經常重用時, 才使用這種緩衝區.

      除 boolean 型別以外, 每種基本型別都有對應的緩衝區類, 包括 CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer 和 ShortBuffer. 這幾個緩衝區類都有一個能夠返回自身例項的靜態工廠方法allocate(int capacity). 在 CharBuffer 中存放的資料單元為字元, 在 DoubleBuffer 中存放的資料單元為 double 資料, 依此類推. 還有一個緩衝區是 MappedByteBuffer, 它是 ByteBuffer 的子類. MappedByteBuffer 能夠把緩衝區和檔案的某個區域直接對映.

      所有具有緩衝區類都提供了讀寫緩衝區的方法:

  • get(): 相對讀. 從緩衝區的當前位置讀取一個單元的資料, 讀完後把位置加 1;
  • get(int index): 絕對讀. 從引數 index 指定的位置讀取一個單元的資料;
  • put(): 相對寫. 向緩衝區的當前位置寫入一個單元的資料, 寫完後把位置加 1;
  • put(int index): 絕對寫. 向引數 index 指定的位置寫入一個單元的資料.

2.2  字元編碼 Charset

      java.nio.Channel 類的每個例項代表特定的字元編碼型別. 如圖 4-7 所示, 把位元組序列轉換為字串的過程稱為解碼; 把字串轉換為位元組序列的過程稱為編碼.

      編碼與解碼

                             圖 4-7 編碼與解碼

      Charset 類提供了編碼與解碼的方法:

  • ByteBuffer encode(String str): 對引數 Str 指定的字串進行編碼, 把得到的位元組序列存放在一個 ByteBuffer 物件中, 並將其返回;
  • ByteBuffer encode(CharBuffer cb): 對引數 cb 指定的字元緩衝區中的字元進行編碼,把得到的位元組序列存放在一個 ByteBuffer 物件中, 並將其返回;
  • CharBuffer decode(ByteBuffer bb): 把引數 bb 指定的 ByteBuffer 中的位元組序列進行解碼, 把得到的字元序列存放在一個 CharBuffer 物件中, 並將其返回.

      Charset 類的靜態 forName(String encode) 方法返回一個 Charset 物件, 它代表引數 encode 指定的編碼型別. 例如, 以下程式碼建立了一個代表”GBK” 編碼的 Charset物件:

         Charset charset = Charset.forName(“GBK”);                                       

       Charset 類還有一個靜態方法 defaultCharset(), 它返回代表本地平臺的預設字元編碼的 Charset 物件. 

2.3 通道Channel

      通道 Channel 用來連線緩衝區與資料來源或資料匯(資料目的地). 如圖4-8 所示, 資料來源的資料經過管道到達緩衝區, 緩衝區的資料經過通道到達資料匯.

      通道的作用

                                    圖4-8 通道的作用

       如圖 4-9 所示, 顯示了 Channel 的主要層次結構.

       Channel 的主要層次結構

                                                        圖4-9 Channel 的主要層次結構

       java.nio.channels.Channel 介面只聲明瞭兩個方法.

  • close(): 關閉通道;
  • isOpen(): 判斷通道是否開啟.

      通道在建立時被開啟, 一旦關閉通道, 就不能重新打開了.

      Channel 介面的兩個最重要的子介面是 ReadableByteChannel 和 WritableByteChannel. ReadableByteChannel 介面聲明瞭 read(ByteBuffer dst) 方法, 該方法把資料來源的資料讀入引數指定的 ByteBuffer 緩衝區中; WritableByteChannel 介面聲明瞭 write(ByteBuffer src)方法, 該方法把引數指定的 ByteBuffer 緩衝區中的資料寫到資料匯中. 如圖4-10 所示, 顯示了 Channel 與 Buffer 的關係. ByteChannel 介面是一個便利介面, 它擴充套件了 ReadByteChannel 和 WritableByteChannel 介面, 因而同時支援讀寫操作.

       Channel 與 Buffer 的關係

                             圖4-10 Channel 與 Buffer   的關係

      ScatteringByteChannel 介面擴充套件了 ReadByteChannel 介面, 允許分散地讀取資料. 分散讀取資料是指單個讀取操作能填充多個緩衝區.  ScatteringByteChannel 介面聲明瞭 read(ByteBuffer[] dsts)方法, 該方法把從資料來源讀取的資料依次填充到引數指定的 ByteBuffer 陣列的各個 ByteBuffer 中. GatheringByteChannel 介面擴充套件了 WritableByteChannel 介面, 允許集中地寫入資料. 集中寫入資料是指單個寫操作能把多個緩衝區的資料寫入資料匯. GatheringByteChannel 介面聲明瞭 write(ByteBuffer[] srcs)方法, 該方法依次把引數指定的 ByteBuffer 陣列的每個 ByteBuffer 中的資料寫入資料匯. 分散讀取和集中寫資料能夠進一步提高輸入和輸出操作的速度.

     FileChannel 類是 Channel 介面的實現類, 代表一個與檔案相連的通道. 該類實現了 ByteChannel, ScatteringByteChannel, GatheringByteChannel 介面, 支援讀操作, 寫操作, 分散讀操作和集中寫操作. FileChannel 類沒有提供公開的構造方法, 一次客戶程式不能用 new 語句來構造它的實現. 不過, 在 FileInputStream, FileOutputStream 和 RandomAccessFile 類中提供了 getChannel() 方法, 該方法返回對應的 FileChannel 物件.

      SelectableChannel 也是一種通道, 它不僅支援阻塞的 I/O 操作, 還支援非阻塞的 I/O 操作. SelectableChannel 有兩個子類: ServerSocketChannel 和 SocketChannel. SocketChannel 還實現了 ByteChannel 介面, 具有 read(ByteBuffer dst) 和 write(ByteBuffer src) 方法.

       注意上面的圖4-9 Channel 的主要層次結構, 這個跟原書有點區別, 裡面的類都是 jdk1.5的, 其中 SocketChannel 是實現了 ByteChannel, ScatteringByteChannel, GatheringByteChannel 介面, SocketChannel 還有一個子類SocketChannelImpl, SocketChannelImpl 的原始碼看不到呢.

2.4 SelectableChannel 類

      SelectableChannel 類是一種支援阻塞 I/O 和非阻塞 I/O 的通道. 在非阻塞模式下, 讀寫資料不會阻塞, 並且SelectableChannel 可以向 Selector 註冊讀就緒和寫就緒等事件. Selector 負責監控這些事件, 等到事件發生時, 比如發生了讀就緒事件, SelectableChannel 就可以執行讀操作了.

      SelectableChannel 的主要方法如下:

  • public SelecotableChannel configureBlocking(boolean block) throws IOException

     數block 為true 時, 表示把 SelectableChannel 設為阻塞模式; 如果引數block 為false, 表示把 SelectableChannel 設為非阻塞模式. 預設情況下, SelectableChannel 採用阻塞模式. 該方法返回 SelectableChannel 物件本身的引用, 相當於” return this”.

  • public SelectionKey register(Selector sel, int ops) throws ClosedChannelException
  • public SelectionKey register(Selector sel, int ops, Object attachment) throws ClosedChannelException

      後兩個方法都向 Selector 註冊時間, 如以下 socketChannel( SelectableChannel 的一個子類) 向 Selector 註冊讀就緒和寫就緒事件:

         SelectionKey key = socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

       register() 方法返回一個 SelectionKey 物件, SelectionKey 用來跟蹤被註冊的事件. 第二個register() 方法還有一個Object 型別的引數 attachment, 它用於為 SelectionKey 關聯一個附件, 當被註冊事件發生後, 需要處理該事件時, 可以從 SelectionKey 中獲得這個附件, 該附件可用來包含與處理這個事件相關的資訊. 以下這兩段程式碼是等價的:

        MyHandler handler = new MyHandler();             //負責處理事件的物件
        SelectionKey key = socketChannel.register(selector, SelectioinKey.OP_READ | SelectionKey.OP_WRITE, handler );

等價於:

        MyHandler handler = new MyHandler();             //負責處理事件的物件
        SelectionKey key = socketChannel.register(selector, SelectioinKey.OP_READ | SelectionKey.OP_WRITE);
        key.attach(handler );                                           //為SelectionKey 關聯一個附件

 2.5 ServerSocketChannel 類

       SeverSocketChannel 從 SeletableChannel 中繼承了 configureBlocking() 和 register()方法. ServerSocketChannel 是 ServerSocket 的替換類, 也具有負責接收客戶連線的 accept() 方法. ServerSocket 並沒有 public 型別的構造方法, 必須通過它的靜態方法open() 來建立 ServerSocketChannel 物件. 每個ServerSocketChannel 物件都與一個ServerSocket 物件關聯. ServerSocketChannel 的 socket() 方法返回與它關聯的 ServerSocket 物件. 可通過以下方法把伺服器程序繫結到一個本地埠:

           serverSocketChannel.socket().bind(port);                                                          

       ServerSocketChannel 的主要方法如下:

  • public static ServerSocketChannel open() throws IOException

     這是 ServerSocketChannel 類的靜態工廠方法, 它返回一個 ServerSocketChannel 物件, 這個物件沒有與任何本地埠繫結, 並且處於阻塞模式.

  • public SocketChannel accept() throws IOException

      類似於 ServerSocket 的accept() 方法, 用於接收客戶的連線. 如果 ServerSocketChannel 處於非阻塞狀態, 當沒有客戶連線時, 該方法立即返回 null; 如果ServerSocketChannel 處於阻塞狀態, 當沒有客戶連線時, 它會一直阻塞下去, 直到有客戶連線就緒, 或者出現了IOException.

      值得注意的是, 該方法返回的 SocketChannel 物件處於阻塞模式, 如果希望把它改為非阻塞模式, 必須執行以下程式碼:

        socketChannel.configureBlocking(false);                                       

  • public final int validOps()

     返回 ServerSocketChannel 所能產生的事件, 這個方法總是返回 SelectionKey.OP_ACCEPT.

  • public ServerSocket socket()

     返回與 ServerSocketChannel 關聯的 ServerSocket 物件. 每個 ServerSocketChannel 物件都與一個 ServerSocket 物件關聯.

2.6 SocketChannel 類

      SocketChannel 可看作是 Socket 的替代類, 但它比 Socket 具有更多的功能. SocketChannel 不僅從 SelectableChannel 父類中繼承了 configureBlocking() 和 register() 方法, 並且實現了 ByteChannel 介面, 因此具有用於讀寫資料的 read(ByteBuffer dst) 和 write(ByteBuffer src) 方法. SocketChannel 沒有public 型別的構造方法, 必須通過它的靜態方法open() 來建立 SocketChannel 物件.

      SocketChannel 的主要方法如下:

  • public static SocketChannel open() throws IOException
  • public static SocketChannel open(SocketAddress remote) throws IOException

      SocketChannel 的靜態工廠方法open() 負責建立 SocketChannel 物件, 第二個帶引數的構造方法還會建立與遠端伺服器的連線. 在阻塞模式下及非阻塞模式下, 第二個open() 方法有不同的行為, 這與 SocketChannel 類的 connect() 方法類似, 可參見本屆 connect() 方法的介紹.

      以下兩段程式碼是等價的:

        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.connect(remote);       //remote 為 SocketAddress 型別

      等價於:

        SocketChannel socketChannel = SocketChannel.open(remote); //remote 為 SocketAddress 型別

      值得注意的是, open() 方法返回的SocketChannel 物件處於阻塞模式, 如果希望把它改為非阻塞模式, 必須執行以下程式碼:

         socketChannel.configureBlock(false);

  • public final int validOps()

      返回SocketChannel 所能產生的事件, 這個方法總是返回以下值:

        SelectionKey.OP_CONNECT | SelectionKey.OP_READ | SelectionKey.OP_WRITEN                        

  • public Socket socket()

      返回與這個SocketChannel 關聯的 Socket 物件. 每個 SocketChannel 物件都與一個 Socket 物件關聯.

  • public boolean isConnected()

      判斷底層 Socket 是否已經建立了遠端連線.

  • public boolean isConnectionPending()

      判斷是否正在進行遠端連線. 當遠端連線操作已經開始, 但是還沒有完成時, 則返回true, 否則返回false. 也就是說, 當底層Socket 還沒有開始連線, 或者已經連線成功時, 該方法都會返回false.

  • public boolean connect(SocketAddress remote)throws IOException

      使底層Socket 建立遠端連線. 當SocketChannel 處於非阻塞模式時, 如果立即連線成功, 該方法返回true, 如果不能立即連線成功, 該方法返回false, 程式過會兒必須通過呼叫finishConnect() 方法來完成連線. 當SocketChannel 處於阻塞模式, 如果立即連線成功, 該方法返回true, 如果不能立即連線成功, 將進入阻塞狀態, 直到連線成功, 或者出現 I/O 異常.

  • public boolean finishConnect() throws IOExcetion

      試圖完成連線遠端伺服器的操作. 在非阻塞模式下, 建立連線從呼叫SocketChannel 的connect() 方法開始, 到呼叫 finishConnect() 方法結束. 如果finishConnect() 方法順利完成連線, 或者在呼叫次方法之前連線已經建立, 則finishConnect() 方法立即返回true. 如果連線操作還沒有完成, 則立即返回false; 如果連線操作中遇到異常而失敗, 則丟擲響應的I/O 異常.

      在阻塞模式下, 如果連線操作還沒有完成, 則會進入阻塞狀態, 直到連線完成或者出現I/O 異常.

  • public int read(ByteBuffer dst) throws IOException

      從 Channel 中讀入若干位元組, 把他們存放到引數指定的 ByteBuffer 中. 假定執行read() 方法前, ByteBuffer 的位置為p, 剩餘容量為r, r 等於dst.remaining() 方法的返回值. 假定read() 方法實際讀入了 n 個位元組, 那麼 0 ≤ n ≤ r. read() 方法返回後, 引數 dst 引用的ByteBuffer 的位置變為 p+n, 極限保持不變, 如圖4-11 所示:

      read() 方法讀入n個位元組

                               圖4-11 read() 方法讀入 n 個位元組

      在阻塞模式下, read() 方法會爭取讀到 r 個位元組, 如果輸入流中不足 r 個位元組, 就進入阻塞狀態, 直到讀入了 r 個位元組, 或者讀到了輸入流末尾, 或者出現了 I/O 異常.

      在非阻塞模式下, read() 方法奉行能讀到多少資料就讀多少資料的原則. read() 方法讀取當前通道中的可讀資料, 有可能不足 r 個資金額, 或者為 0 個位元組, read() 方法總是立即返回, 而不會等到讀取了 r 個位元組在返回.

      read() 方法返回的實際上讀入的位元組數, 有可能為 0. 如果返回 -1, 就表示讀到了輸入流的末尾.

  • public int write(ByteBuffer src) throws IOException

      把引數 src 指定的 ByteBuffer 中的位元組寫到 Channel 中. 假定執行 write() 方法前, ByteBuffer  的位置為 p, 剩餘容量為 r, r 等於 src.remaining() 方法的返回值. 假定 write() 方法實際上向通道中寫了 n 個位元組, 那麼 0 ≤ n ≤ r. write() 方法返回後, 引數 src 引用的 ByteBuffer 的位置變為 p+n, 極限保持不變, 如圖4-12 所示:

      write() 方法輸出n個位元組

                                         圖4-12 write() 方法輸出 n 個位元組

      在阻塞模式下, write() 方法會爭取輸出 r 個位元組, 如果底層網路的輸出緩衝區不能容納 r 個位元組, 就進入阻塞狀態, 直到輸出了 r 個位元組, 或者出現了 I/O 異常.

      在非阻塞模式下, write() 方法奉行能輸出多少資料就輸出多少資料的原則, 有可能不足 r 個位元組, 或者為 0 個位元組, write() 方法總是立即返回, 而不會等到輸出 r 個位元組後再返回.

      write() 方法返回實際上輸出的位元組數, 有可能為 0.

2.7 Selector 類

      只要 ServerSocketChannel 及 SocketChannel 向 Selector 註冊了特定的事件, Selector 就會監控這些事件是否發生. SelectableChannel 的 register() 方法負責註冊事件, 該方法返回一個SelectionKey 物件, 該物件是用於跟蹤這些被註冊事件的控制代碼. 一個Selector 物件中會包含 3 種類型的 SelectionKey 集合.

  • all-keys 集合: 當前所有向Selector 註冊的 SelectionKey 的集合, Selector 的keys() 方法返回該集合.
  • selected-keys 集合: 相關時間已經被Selector 捕獲的SelectionKey 的集合. Selector 的selectedKeys() 方法返回該集合.
  • cancelled-keys 集合: 已經被取消的 SelectionKey 的集合. Selector 沒有提供訪問這種集合的方法.

      以上第二種和第三種集合都是第一種集合的子集. 對於一個新建的Selector 物件, 它的上述集合都為空.

      當執行SelectableChannel 的 register() 方法時, 該方法新建一個 SelectionKey, 並把它加入到 Selector 的all-keys 集合中.

      如果關閉了與SelectionKey 物件關聯的 Channel 物件, 或者呼叫了 SelectionKey 物件的cancel() 方法, 這個 SelectionKey 物件就會被加入到 cancelled-keys 集合中, 表示這個 SelectionKey 物件已經被取消, 在程式下一次執行 Selector 的 select() 方法時, 被取消的 SelectionKey 物件將從所有的集合(包括 all-keys 集合, selected-keys集合和cancelled-keys 集合)中刪除.

      在執行 Selector 的 select() 方法時, 如果與 SelectionKey 相關的事件發生了, 這個SelectionKey 就被加入到 selected-keys 集合中. 程式直接呼叫 selected-keys 集合的 remove() 方法, 或者呼叫它的 Iterator 的 remove() 方法, 都可以從 selected-keys 集合中刪除一個 SelectionKey 物件.

      程式不允許直接通過集合介面的 remove() 方法刪除 all-keys 集合中的 SelectionKey 物件. 如果程式試圖這樣做, 那麼會導致 UnsupportedOperationException. (all-keys 應該是一個內部類, 並且不實現remove()的方法, 繼承的物件也是沒實現這些方法的; 還有可能是new HashSet(){重寫remove()方法,直接丟擲異常},如: private static HashSet keys = new HashSet(){
  public boolean remove(Object o){
   throw new UnsupportedOperationException();
  }
 };)

      Selector 類的主要方法如下:

  •  public static Selector open() throws IOException

      這是 Selector 的靜態工廠方法, 建立一個 Selector 物件.

  • public boolean isOpen()

      判斷 Selector 是否處於開啟狀態. Selector 物件建立後就處於開啟狀態, 當呼叫那個了 Selector 物件的 close() 方法, 它就進入關閉狀態.

  • public Set<SelectionKey> keys()

      返回 Selector 的 all-keys 集合, 它包含了所有與 Selector 關聯的 SelectionKey 物件.

  • public int selectNow() throws IOException

      返回相關事件已經發生的 SelectionKey 物件的數目. 該方法採用非阻塞的工作方式, 返回當前相關時間已經發生的 SelectionKey 物件的數目, 如果沒有, 就立即返回 0 .

  • public int select() throws IOException
  • public int select(long timeout) throws IOException

     該方法採用阻塞的工作方式, 返回相關事件已經發生的 SelectionKey 物件的數目, 如果一個也沒有, 就進入阻塞狀態, 直到出現以下情況之一, 才從 select() 方法中返回.

>至少有一個 SelectionKey 的相關事件已經發生;

>其他執行緒呼叫了 Selector 的 wakeup() 方法, 導致執行 select() 方法的執行緒立即從 select() 方法中返回.

>當前執行 select() 方法的執行緒被其他執行緒中斷.

>超出了等待時間. 該時間由 select(long timeout) 方法的引數 timeout 設定, 單位為毫秒. 如果等待超時, 就會正常返回, 但不會丟擲超時異常. 如果程式呼叫的是不帶引數的 select() 方法, 那麼永遠不會超時, 這意味著執行 select) 方法的執行緒進入阻塞狀態後, 永遠不會因為超時而中斷.

  •  public Selector wakeup()

     呼醒執行 Selector 的 select() 方法(也同樣設用於 select(long timeout) 方法) 的執行緒. 當執行緒A 執行 Selector 物件的 wakeup() 方法時, 如果執行緒B 正在執行同一個 Selector 物件的 select() 方法, 或者執行緒B 過一會兒會執行這個 Selector 物件的 select() 方法, 那麼執行緒B 在執行 select() 方法時, 會立即從 select() 方法中返回, 而不會阻塞. 假如, 執行緒B 已經在 select() 方法中阻塞了, 也會立即被呼醒, 從select() 方法中返回.

      wakeup() 方法只能呼醒執行select() 方法的執行緒B 一次. 如果執行緒B 在執行 select() 方法時被呼醒後, 以後在執行 select() 方法, 則仍舊按照阻塞方式工作, 除非執行緒A 再次呼叫 Selector 物件的 wakeup() 方法.

  • public void close() throws IOException

      關閉 Selector. 如果有其他執行緒正執行這個Selector 的select() 方法並且處於阻塞狀態, 那麼這個執行緒會立即返回. close() 方法使得 Selector 佔用的所有資源都被釋放, 所有與 Selector 關聯的 SelectionKey 都被取消.

2.8 SelectionKey 類

     ServerSocketChannel 或 SocketChannel 通過 register() 方法向 Selector 註冊事件時, register() 方法會建立一個 SelectionKey 物件, 這個 SelectionKey 物件是用來跟蹤註冊事件的控制代碼. 在 SelectionKey 物件的有效期間, Selector 會一直監控與 SelectionKey 物件相關的事件, 如果事件發生, 就會把 SelectionKey 物件加入到 selected-keys 集合中. 在以下情況下, SelectionKey 物件會失效, 這意味著 Selector 再也不會監控與它相關的事件了:

⑴ 程式呼叫 SelectionKey 的 cancel() 方法;

⑵ 關閉與 SelectionKey 關聯的 Channel;

⑶ 與 SelectionKey 關聯的 Selector 被關閉.

     在 SelectionKey 中定義了 4 種事件, 分別用 4 個 int 型別的常量來表示.

  • SelectionKey.OP_ACCEPT: 接收連線就緒事件, 表示伺服器監聽到了客戶連線, 伺服器可以接收這個連線了. 常量值為 16.(00010000)
  • SelectionKey.OP_CONNECT: 連線就緒事件, 表示客戶與伺服器的連線已經建立成功. 常量值為 8.(00001000)
  • SelectionKey.OP_READ: 讀就緒事件, 表示通道中已經有了可讀資料, 可以執行讀操作了. 常量值為 1.(00000001)
  • SelectionKey.OP_WRITE: 寫就緒事件, 表示已經可以向通道寫資料了. 常量值為 4.(00000100)

      以上常量分別佔據不同的二進位制位, 因此可以通過二進位制的或運算 “|”, 來將它們進行任意組合. 一個 SelectionKey 物件中包含兩種型別的事件.

  • 所有感興趣的事件: SelectionKey 的 interestOps() 方法返回所有感興趣的事件. 假如, 假定返回值為 SelectionKey.OP_WRITE | SelectionKey.OP_READ, 就表示這個 SelectionKey 對讀就緒和寫就緒事件感興趣. 與之關聯的 Selector 物件會負責監控這些事件. 當通過 SelectableChannel 的 register() 方法註冊事件時, 可以在引數中指定 SelectionKey 感興趣的事件. 假如, 以下程式碼表明新建的 SelectionKey 對連線就緒和讀就緒事件感興趣:

          SelectionKey key = socketChannel.register(selector,  SelectionKey.OP_CONNECT | SelectionKey.OP_READ );

       SelectionKey 的 interestOps(int ops) 方法用於為 SelectionKey 物件增加一個感興趣的事件. 假如, 以下程式碼使得 SelectionKey 增加了一個感興趣的事件:

          key.interestOps( SelectionKey.OP_WRITE );                                                     

  • 所有已經發生的事件: SelectionKey 的 readyOps() 方法返回所有已經發生的事件. 假如, 假定返回值為 SelectionKey.OP_WRITE | SelectionKey.OP_READ , 表示讀就緒和寫就緒事件發生了, 這意味著與之關聯的 SocketChannel 物件可以進行讀操作和寫操作了.

     當程式呼叫一個 SelectableChannel ()