1. 程式人生 > >用Java實現非阻塞通訊 和阻塞通訊

用Java實現非阻塞通訊 和阻塞通訊

用ServerSocket和Socket來編寫伺服器程式和客戶程式,是Java網路程式設計的最基本的方式。這些伺服器程式或客戶程式在執行過程中常常會阻塞。例如當一個執行緒執行ServerSocket的accept()方法時,假如沒有客戶連線,該執行緒就會一直等到有了客戶連線才從accept()方法返回。再例如當執行緒執行Socket的read()方法時,如果輸入流中沒有資料,該執行緒就會一直等到讀入了足夠的資料才從read()方法返回。

假如伺服器程式需要同時與多個客戶通訊,就必須分配多個工作執行緒,讓它們分別負責與一個客戶通訊,當然每個工作執行緒都有可能經常處於長時間的阻塞狀態。

從JDK1.4版本開始,引入了非阻塞的通訊機制。伺服器程式接收客戶連線、客戶程式建立與伺服器的連線,以及伺服器程式和客戶程式收發資料的操作都可以按非阻塞的方式進行。伺服器程式只需要建立一個執行緒,就能完成同時與多個客戶通訊的任務。

非阻塞的通訊機制主要由java.nio包(新I/O包)中的類實現,主要的類包括ServerSocketChannel、SocketChannel、Selector、SelectionKey和ByteBuffer等。

一、執行緒阻塞

在生活中,最常見的阻塞現象是公路上汽車的堵塞。汽車在公路上快速執行,如果前方交通受阻,就只好停下來等待,等到公路順暢,才能恢復執行。

執行緒在執行中也會因為某些原因而阻塞。所有處於阻塞狀態的執行緒的共同特徵是:放棄CPU,暫停執行,只有等到導致阻塞的原因消除,才能恢復執行;或者被其他執行緒中斷,該執行緒會退出阻塞狀態,並且丟擲InterruptedException。

1.執行緒阻塞的原因

導致執行緒阻塞的原因主要有以下方面:

  • 執行緒執行了Thread.sleep(int n)方法,執行緒放棄CPU,睡眠n毫秒,然後恢復執行。
  • 執行緒要執行一段同步程式碼,由於無法獲得相關的同步鎖,只好進入阻塞狀態,等到獲得了同步鎖,才能恢復執行。
  • 執行緒執行了一個物件的wait()方法,進入阻塞狀態,只有等到其他執行緒執行了該物件的notify()或notifyAll()方法,才可能將其喚醒。
  • 執行緒執行I/O操作或進行遠端通訊時,會因為等待相關的資源而進入阻塞狀態。例如當執行緒執行System.in.read()方法時,如果使用者沒有向控制檯輸入資料,則該執行緒會一直等讀到了使用者的輸入資料才從read()方法返回。

進行遠端通訊時,在客戶程式中,執行緒在以下情況可能進入阻塞狀態:

  • 請求與伺服器建立連線時,即當執行緒執行Socket的帶引數的構造方法,或執行Socket的connect()方法時,會進入阻塞狀態,直到連線成功,此執行緒才從Socket的構造方法或connect()方法返回。
  • 執行緒從Socket的輸入流讀入資料時,如果沒有足夠的資料,就會進入阻塞狀態,直到讀到了足夠的資料,或者到達輸入流的末尾,或者出現了異常,才從輸入流的read()方法返回或異常中斷。輸入流中有多少資料才算足夠呢?這要看執行緒執行的read()方法的型別:
  1.  
    1. int read():只要輸入流中有一個位元組,就算足夠。
    2. int read(byte[] buff):只要輸入流中的位元組數目與引數buff陣列的長度相同就算足夠。
    3. String readLine():只要輸入流中有一行字串,就算足夠。值得注意的是InputStream類並沒有readLine()方法,在過濾流BufferedReader類中才有此方法。
  • 執行緒向Socket的輸出流寫一批資料時,可能會進入阻塞狀態,等到輸出了所有的資料,或者出現異常,才從輸出流的write()方法返回或異常中斷。
  • 當呼叫Socket的setSoLinger()方法設定了關閉Socket的延遲時間,那麼當執行緒執行Socket的close()方法時,會進入阻塞狀態,直到底層Socket傳送完所有剩餘資料,或者超過了setSoLinger()方法設定的延遲時間,才從close()方法返回。

在伺服器程式中,執行緒在以下情況可能會進入阻塞狀態:

  • 執行緒執行ServerSocket的accept()方法,等待客戶的連線,直到接收到了客戶連線,才從accept()方法返回。
  • 執行緒從Socket的輸入流讀入資料時, 如果輸入流沒有足夠的資料,就會進入阻塞狀態。
  • 執行緒向Socket的輸出流寫一批資料時,可能會進入阻塞狀態,等到輸出了所有的資料,或者出現異常,才從輸出流的write()方法返回或異常中斷。

由此可見,無論是在伺服器程式還是客戶程式中,當通過Socket的輸入流和輸出流來讀寫資料時,都可能進入阻塞狀態。這種可能出現阻塞的輸入和輸出操作被稱為阻塞I/O。與此對照,如果執行輸入和輸出操作時,不會發生阻塞,則稱為非阻塞I/O。

2.伺服器程式用多執行緒處理阻塞通訊的侷限

圖1顯示了伺服器程式用多執行緒來同時處理多個客戶連線的工作流程。主執行緒負責接收客戶的連線。線上程池中有若干工作執行緒,它們負責處理具體的客戶連線。每當主執行緒接收到一個客戶連線,主執行緒就會把與這個客戶互動的任務交一個空閒的工作執行緒去完成,主執行緒繼續負責接收下一個客戶連線。

圖1  伺服器程式用多執行緒處理阻塞通訊

在圖1中,用粗體框標識的步驟為可能引起阻塞的步驟。可以看出,當主執行緒接收客戶連線,以及工作執行緒執行I/O操作時,都有可能進入阻塞狀態。

伺服器程式用多執行緒來處理阻塞I/O,儘管能滿足同時響應多個客戶請求的需求,但是有以下侷限:

(1)Java虛擬機器會為每個執行緒分配獨立的堆疊空間,工作執行緒數目越多,系統開銷就越大,而且增加了Java虛擬機器排程執行緒的負擔,增加了執行緒之間同步的複雜性,提高了執行緒死鎖的可能性。

(2)工作執行緒的許多時間都浪費在阻塞I/O操作上,Java虛擬機器需要頻繁地轉讓CPU的使用權,使進入阻塞狀態的執行緒放棄CPU,再把CPU分配給處於可執行狀態的執行緒。

由此可見,工作執行緒並不是越多越好。如圖2所示,保持適量的工作執行緒,會提高伺服器的併發效能,但是當工作執行緒的數目到達某個極限,超出了系統的負荷時,反而會降低併發效能,使得多數客戶無法快速得伺服器的響應。

併發效能

圖2執行緒數目與併發技能的關係

3.非阻塞通訊的基本思想

假如同事要做兩件事:燒開水和燒粥。燒開水的步驟如下:

鍋裡放水,開啟煤氣爐;

等待水燒開; //阻塞

關閉煤氣爐,把開水灌到水壺裡;

燒燒粥的步驟如下:

鍋裡放水和米,開啟煤氣爐;

等待粥燒開; //阻塞

調整煤氣爐,改為小火;

等待粥燒熟; //阻塞

關閉煤氣爐;

為了同時完成兩件事,一種方案是同時請兩個人分別做其中的一件事,這相當於採用多執行緒來同時完成多個任務。還有一種方案是讓一個人同時完成兩件事,這個人應該善於利用一件事的空閒時間去做另一件事,這個人一刻也不應該閒著:

鍋裡放水,開啟煤氣爐; //開始燒開水

鍋裡放水和米,開啟煤氣爐; //開始燒粥

while(一直等待,直到有水燒開、粥燒開或粥燒熟事件發生){  //阻塞

if(水燒開)

關閉煤氣爐,把開水灌到水壺裡;

if(粥燒開)

調整煤氣爐,改為小火;

if(粥燒熟)

關閉煤氣爐;

}

這個人不斷監控燒水以及燒粥的狀態,如果發生了“水燒開”、“粥燒開”或“粥燒熟”事件,就去處理這些事件,處理完一件事後繼續監控燒水以及燒粥的狀態,直到所有的任務都完成。

以上工作方式也可以運用到伺服器程式中,伺服器程式只需要一個執行緒就能同時負責接收客戶的連線、接收各個客戶傳送的資料,以及向各個客戶傳送響應資料。伺服器程式的處理流程如下:

while(一直等待,直到有接收連線就緒事件、讀就緒事件或寫就緒事件發生){ //阻塞

if(有客戶連線)

接收客戶的連線;  //非阻塞

if(某個Socket的輸入流中有可讀資料)

從輸入流中讀資料;  //非阻塞

if(某個Socket的輸出流可以寫資料)

向輸出流寫資料;  //非阻塞

}

以上處理流程採用了輪詢的工作方式,當某一種操作就緒,就執行該操作,否則就察看是否還有其他就緒的操作可以執行。執行緒不會因為某一個操作還沒有就緒,就進入阻塞狀態,一直傻傻地在那裡等待這個操作就緒。

為了使輪詢的工作方式順利進行,接收客戶的連線、從輸入流讀資料、以及向輸出流寫資料的操作都應該以非阻塞的方式執行。所謂非阻塞,就是指當執行緒執行這些方法時,如果操作還沒有就緒,就立即返回,而不會一直等到操作就緒。例如當執行緒接收客戶連線時,如果沒有客戶連線,就立即返回;再例如當執行緒從輸入流中讀資料時,如果輸入流中還沒有資料,就立即返回,或者如果輸入流還沒有足夠的資料,那麼就讀取現有的資料,然後返回。值得注意的是,以上while迴圈條件中的操作還是按照阻塞方式進行的,如果未發生任何事件,就會進入阻塞狀態,直到接收連線就緒事件、讀就緒事件或寫就緒事件中至少有一個事件發生,此時就會執行while迴圈體中的操作。

二、java.nio包中的主要類

java.nio包提供了支援非阻塞通訊的類,主要包括:

  • ServerSocketChannel:ServerSocket的替代類,支援阻塞通訊與非阻塞通訊。
  • SocketChannel:Socket的替代類,支援阻塞通訊與非阻塞通訊。
  • Selector:為ServerSocketChannel監控接收連線就緒事件,為SocketChannel監控連線就緒、讀就緒和寫就緒事件。
  • SelectionKey:代表ServerSocketChannel以及SocketChannel向Selector註冊事件的控制代碼。當一個SelectionKey物件位於Selector物件的selected-keys集合中,就表示與這個SelectionKey物件相關的事件發生了。

ServerSocketChannel以及SocketChannel都是SelectableChannel的子類,如圖3所示。SelectableChannel類以及其子類都能委託Selector來監控它們可能發生的一些事件,這種委託過程也稱為註冊事件過程。

圖3  SelectableChannel類及其子類的類框圖

ServerSocketChannel向Selector註冊接收連線就緒事件的程式碼如下:

SelectionKey key=serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT);

SelectionKey類的一些靜態常量表示事件型別,ServerSocketChannel只可能發生一種事件:

  • SelectionKey.OP_ACCEPT:接收連線就緒事件,表示至少有了一個客戶連線,伺服器可以接收這個連線。

SocketChannel可能發生以下三種事件:

  • SelectionKey.OP_CONNECT:連線就緒事件,表示客戶與伺服器的連線已經建立成功。
  • SelectionKey.OP_READ:讀就緒事件,表示輸入流中已經有了可讀資料,可以執行讀操作了。
  • SelectionKey.OP_WRITE:寫就緒事件,表示已經可以向輸出流寫資料了。

SocketChannel提供了接收和傳送資料的方法:

  • read(ByteBuffer buffer):接收資料,把它們存放到引數指定的ByteBuffer中。
  • write(ByteBuffer buffer):把引數指定的ByteBuffer中的資料傳送出去。

ByteBuffer表示位元組緩衝區,SocketChannel的read()和write()方法都會操縱ByteBuffer。ByteBuffer類繼承於Buffer類。ByteBuffer中存放的是位元組,為了把它們轉換為字串,還需要用到Charset類,Charset類代表字元編碼,它提供了把位元組流轉換為字串(解碼過程)和把字串轉換為位元組流(編碼過程)的實用方法。

三、非阻塞程式設計例項 

1.建立非阻塞的EchoServer

在非阻塞模式下,EchoServer只需要啟動一個主執行緒,就能同時處理三件事:

● 接收客戶的連線。

● 接收客戶傳送的資料。

● 向客戶發回響應資料。

EchoServer委託Selector來負責監控接收連線就緒事件、讀就緒事件和寫就緒事件,如果有特定事件發生,就處理該事件。

EchoServer類的構造方法負責啟動伺服器,把它繫結到一個本地埠,程式碼如下:

  1. //建立一個Selector物件 
  2. selector = Selector.open(); 
  3. //建立一個ServerSocketChannel物件 
  4. serverSocketChannel = ServerSocketChannel.open(); 
  5. //使得在同一個主機上關閉了伺服器程式,緊接著再啟動該伺服器程式時,可以順利繫結到相同的埠 
  6. serverSocketChannel.socket().setReuseAddress(true); 
  7. //使ServerSocketChannel工作於非阻塞模式 
  8. serverSocketChannel.configureBlocking(false); 
  9. //把伺服器程序與一個本地埠繫結 
  10. serverSocketChannel.socket().bind(new InetSocketAddress(port)); 

EchoServer類的service()方法負責處理本節開頭所說的三件事,體現其主要流程的程式碼如下:

  1. public void service() throws IOException 
  2.     serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
  3.     while( selector.select() > 0 ) 
  4.     { //第一層while迴圈 
  5.         Set readyKeys = selector.selectedKeys(); //獲得Selector的selected-keys集合 
  6.         Iterator it = readyKeys.iterator(); 
  7.         while( it.hasNext() ) 
  8.         { //第二層while迴圈 
  9.             SelectionKey key = null; 
  10.             try 
  11.             { //處理SelectionKey 
  12.                 key = (SelectionKey)it.next(); //取出一個SelectionKey 
  13.                 it.remove(); //把SelectionKey從Selector的selected-key集合中刪除 
  14.                 if( key.isAcceptable() ) 
  15.                 { 
  16.                     //處理接收連線就緒事件; 
  17.                 } 
  18.                 if( key.isReadable() ) 
  19.                 { 
  20.                     //處理讀就緒事件; 
  21.                 } 
  22.                 if( key.isWritable() ) 
  23.                 { 
  24.                     //處理寫就緒事件; 
  25.                 } 
  26.             } 
  27.             catch( IOException e ) 
  28.             { 
  29.                 e.printStackTrace(); 
  30.                 try 
  31.                 { 
  32.                     if( key != null ) 
  33.                     { 
  34.                         //使這個SelectionKey失效, 使得Selector不再監控這個SelectionKey感興趣的事件 
  35.                         key.cancel(); 
  36.                         key.channel().close(); //關閉與這個SelectionKey關聯的SocketChannel 
  37.                     } 
  38.                 } 
  39.                 catch( Exception ex ) 
  40.                 { 
  41.                     e.printStackTrace(); 
  42.                 } 
  43.             } 
  44.         }//#while 
  45.     }//#while 

在service()方法中,首先由ServerSocketChannel向Selector註冊接收連線就緒事件。如果Selector監控到該事件發生,就會把相應的SelectionKey物件加入到selected-keys集合中。service()方法接下來在第一層while迴圈中不斷詢問Selector已經發生的事件,然後依次處理每個事件。

Selector的select()方法返回當前相關事件已經發生的SelectionKey的個數。如果當前沒有任何事件發生,select()方法就會阻塞下去,直到至少有一個事件發生。Selector的selectedKeys()方法返回selected-keys集合,它存放了相關事件已經發生的SelectionKey物件。

service()方法在第二層while迴圈中,從selected-keys集合中依次取出每個SelectionKey物件,把它從selected-keys集合中刪除,然後呼叫isAcceptable()、isReadable()和isWritable()方法判斷到底是哪種事件發生了,從而作出相應的處理。處理每個SelectionKey的程式碼放在一個try語句中,如果出現異常,就會在catch語句中使這個SelectionKey失效,並且關閉與之關聯的Channel。

(1)處理接收連線就緒事件

service()方法中處理接收連線就緒事件的程式碼如下:

  1. if( key.isAcceptable() ) 
  2.     //獲得與SelectionKey關聯的ServerSocketChannel 
  3.     ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); 
  4.     //獲得與客戶連線的SocketChannel 
  5.     SocketChannel socketChannel = (SocketChannel)ssc.accept(); 
  6.     System.out.println("接收到客戶連線,來自:" + socketChannel.socket().getInetAddress() + ":" 
  7.             + socketChannel.socket().getPort()); 
  8.     //把SocketChannel設定為非阻塞模式 
  9.     socketChannel.configureBlocking(false); 
  10.     //建立一個用於存放使用者傳送來的資料的緩衝區 
  11.     ByteBuffer buffer = ByteBuffer.allocate(1024); 
  12.     //SocketChannel向Selector註冊讀就緒事件和寫就緒事件, 關聯了一個buffer附件 
  13.     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); 

如果SelectionKey的isAcceptable()方法返回true,就意味著這個 SelectionKey所感興趣的接收連線就緒事件已經發生了。service()方法首先通過SelectionKey的channel()方法獲得與之關聯的ServerSocketChannel物件,然後呼叫ServerSocketChannel的accept()方法獲得與客戶連線的SocketChannel物件。這個SocketChannel物件預設情況下處於阻塞模式。如果希望它執行非阻塞的I/O操作,需要呼叫它的configureBlocking(false)方法。SocketChannel呼叫Selector的register()方法來註冊讀就緒事件和寫就緒事件,還向register()方法傳遞了一個ByteBuffer型別的引數,這個ByteBuffer將作為附件與新建的SelectionKey物件關聯。

(2)處理讀就緒事件

如果SelectionKey的isReadable()方法返回true,就意味著這個SelectionKey所感興趣的讀就緒事件已經發生了。EchoServer類的receive()方法負責處理這一事件:

  1. public void receive(SelectionKey key) throws IOException 
  2.     //獲得與SelectionKey關聯的附件 
  3.     ByteBuffer buffer = (ByteBuffer)key.attachment(); 
  4.     //獲得與SelectionKey關聯的SocketChannel 
  5.     SocketChannel socketChannel = (SocketChannel)key.channel(); 
  6.     //建立一個ByteBuffer,用於存放讀到的資料 
  7.     ByteBuffer readBuff = ByteBuffer.allocate(32); 
  8.     socketChannel.read(readBuff); 
  9.     readBuff.flip(); 
  10.     //把buffer的極限設為容量 
  11.     buffer.limit(buffer.capacity()); 
  12.     //把readBuff中的內容拷貝到buffer中,假定buffer的容量足夠大,不會出現緩衝區溢位異常 
  13.     buffer.put(readBuff); 

在receive()方法中,先獲得與這個SelectionKey關聯的ByteBuffer和SocketChannel。SocketChannel每次讀到的資料都被新增到這個ByteBuffer,在程式中,由buffer變數引用這個ByteBuffer物件。在非阻塞模式下,socketChannel.read(readBuff)方法讀到多少資料是不確定的,假定讀到的位元組為n個,那麼“0<=n<readBuff”的容量。EchoServer要求每接收到客戶的一行字串XXX(也就是字串以“/r/n”結尾),就返回字串echo:XXX。由於無法保證socketChannel.read(readBuff)方法一次讀入一行字串,因此只好把它每次讀入的資料都放到buffer中,當這個buffer中湊足了一行字串,再把它傳送給客戶。

receive()方法的許多程式碼都涉及對ByteBuffer的三個屬性(position、limit和capacity)的操作,圖4演示了以上readBuff和buffer變數的三個屬性的變化過程。假定SocketChannel的read()方法讀入了6個位元組,把它存放在readBuff中,並假定buffer中原來有10個位元組,buffer.put(readBuff)方法把readBuff中的6個位元組拷貝到buffer中,buffer中最後有16個位元組。

圖4  receive()方法操縱readBuff和buffer的過程

(3)處理寫就緒事件

如果SelectionKey的isWritable()方法返回true,就意味著這個SelectionKey所感興趣的寫就緒事件已經發生了。EchoServer類的send()方法負責處理這一事件:

  1. public void send(SelectionKey key) throws IOException 
  2.     //獲得與SelectionKey關聯的ByteBuffer 
  3.     ByteBuffer buffer = (ByteBuffer)key.attachment(); 
  4.     //獲得與SelectionKey關聯的SocketChannel 
  5.     SocketChannel socketChannel = (SocketChannel)key.channel(); 
  6.     buffer.flip(); //把極限設為位置,把位置設為0 
  7.     //按照GBK編碼,把buffer中的位元組轉換為字串 
  8.     String data = decode(buffer); 
  9.     //如果還沒有讀到一行資料,就返回 
  10.     if( data.indexOf("/r/n") == -1 ) 
  11.         return; 
  12.     //擷取一行資料 
  13.     String outputData = data.substring(0, data.indexOf("/n") + 1); 
  14.     System.out.print(outputData); 
  15.     //把輸出的字串按照GBK編碼,轉換為位元組,把它放在outputBuffer中 
  16.     ByteBuffer outputBuffer = encode("echo:" + outputData); 
  17.     //輸出outputBuffer中的所有位元組 
  18.     while( outputBuffer.hasRemaining() ) 
  19.         socketChannel.write(outputBuffer); 
  20.     //把outputData字串按照GBK編碼,轉換為位元組,把它放在ByteBuffer中  
  21.     ByteBuffer temp = encode(outputData); 
  22.     //把buffer的位置設為temp的極限 
  23.     buffer.position(temp.limit()); 
  24.     //刪除buffer中已經處理的資料 
  25.     buffer.compact(); 
  26.     //如果已經輸出了字串“bye/r/n”,就使SelectionKey失效,並關閉SocketChannel 
  27.     if( outputData.equals("bye/r/n") ) 
  28.     { 
  29.         key.cancel(); 
  30.         socketChannel.close(); 
  31.         System.out.println("關閉與客戶的連線"); 
  32.     } 

EchoServer的receive()方法把讀入的資料都放到一個ByteBuffer中,send()方法就從這個ByteBuffer中取出資料。如果ByteBuffer中還沒有一行字串,就什麼也不做,直接退出send()方法;否則,就從ByteBuffer中取出一行字串XXX,然後向客戶傳送echo:XXX。接著,send()方法把ByteBuffer中的字串XXX刪除。如果send()方法處理的字串為“bye/r/n”,就使SelectionKey失效,並關閉SocketChannel,從而斷開與客戶的連線。

(4)編碼與解碼

在ByteBuffer中存放的是位元組,它表示字串的編碼。而程式需要把位元組轉換為字串,才能進行字串操作,比如判斷裡面是否包含“/r/n”,以及擷取子字串。EchoServer類的實用方法decode()負責解碼,也就是把位元組序列轉換為字串:

public String decode(ByteBuffer buffer) //解碼
{
CharBuffer charBuffer = charset.decode(buffer);
return charBuffer.toString();
}

decode()方法中的charset變數是EchoServer類的成員變數,它表示GBK中文編碼,它的定義如下:

private Charset charset=Charset.forName("GBK");

在send()方法中,當通過SocketChannel的write(ByteBuffer buffer)方法傳送資料時,write(ByteBuffer buffer)方法不能直接傳送字串,而只能傳送ByteBuffer中的位元組。因此程式需要對字串進行編碼,把它們轉換為位元組序列,放在ByteBuffer中,然後再發送。

ByteBuffer outputBuffer=encode("echo:"+outputData);

while(outputBuffer.hasRemaining())

  socketChannel.write(outputBuffer);

EchoServer類的實用方法encode()負責編碼,也就是把字串轉換為位元組序列:

public ByteBuffer encode(String str) //編碼
{
  return charset.encode(str);
}

(5)在非阻塞模式下確保傳送一行資料

在send()方法的outputBuffer中存放了字串echo:XXX的編碼。在非阻塞模式下,SocketChannel.write(outputBuffer)方法並不保證一次就把outputBuffer中的所有位元組傳送完,而是奉行能傳送多少就傳送多少的原則。如果希望把outputBuffer中的所有位元組傳送完,需要採用以下迴圈:

while(outputBuffer.hasRemaining())  //hasRemaining()方法判斷是否還有未處理的位元組

  socketChannel.write(outputBuffer);

(6)刪除ByteBuffer中的已處理資料

與SelectionKey關聯的ByteBuffer附件中存放了讀操作與寫操作的共享資料。receive()方法把讀到的資料放入ByteBuffer,而send()方法從ByteBuffer中一行行地取出資料。當send()方法從ByteBuffer中取出一行字串XXX,就要把字串從ByteBuffer中刪除。在send()方法中,outputData變數就表示取出的一行字串XXX,程式先把它編碼為位元組序列,放在一個名為temp的ByteBuffer中。接著把buffer的位置設為temp的極限,然後呼叫buffer的compact()方法刪除代表字串XXX的資料。

ByteBuffer temp=encode(outputData);

buffer.position(temp.limit());

buffer.compact();

圖5演示了以上程式碼操縱buffer的過程。圖5中假定temp中有10個位元組,buffer中本來有16個位元組,buffer.compact()方法刪除緩衝區開頭的10個位元組,最後剩下6個位元組。

圖5  從buffer中刪除已經處理過的一行字串XXX

下例程1是EchoServer的源程式。

  1. //例程1  EchoServer.java(非阻塞模式) 
  2. import java.io.*; 
  3. import java.nio.*; 
  4. import java.nio.channels.*; 
  5. import java.nio.charset.*; 
  6. import java.net.*; 
  7. import java.util.*; 
  8. public class EchoServer 
  9.     private Selector selector = null; 
  10.     private ServerSocketChannel serverSocketChannel = null; 
  11.     private int port = 8000; 
  12.     private Charset charset = Charset.forName("GBK"); 
  13.     public EchoServer() throws IOException 
  14.     { 
  15.         selector = Selector.open(); 
  16.         serverSocketChannel = ServerSocketChannel.open(); 
  17.         serverSocketChannel.socket().setReuseAddress(true); 
  18.         serverSocketChannel.configureBlocking(false); 
  19.         serverSocketChannel.socket().bind(new InetSocketAddress(port)); 
  20.         System.out.println("伺服器啟動"); 
  21.     } 
  22.     public void service() throws IOException 
  23.     { 
  24.         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); 
  25.         while( selector.select() > 0 ) 
  26.         { 
  27.             Set readyKeys = selector.selectedKeys(); 
  28.             Iterator it = readyKeys.iterator(); 
  29.             while( it.hasNext() ) 
  30.             { 
  31.                 SelectionKey key = null; 
  32.                 try 
  33.                 { 
  34.                     key = (SelectionKey)it.next(); 
  35.                     it.remove(); 
  36.                     if( key.isAcceptable() ) 
  37.                     { 
  38.                         ServerSocketChannel ssc = (ServerSocketChannel)key.channel(); 
  39.                         SocketChannel socketChannel = (SocketChannel)ssc.accept(); 
  40.                         System.out.println("接收到客戶連線,來自:" + socketChannel.socket().getInetAddress() 
  41.                                 + ":" + socketChannel.socket().getPort()); 
  42.                         socketChannel.configureBlocking(false); 
  43.                         ByteBuffer buffer = ByteBuffer.allocate(1024); 
  44.                         socketChannel.register(selector, SelectionKey.OP_READ 
  45.                                 | SelectionKey.OP_WRITE, buffer); 
  46.                     } 
  47.                     if( key.isReadable() ) 
  48.                     { 
  49.                         receive(key); 
  50.                     } 
  51.                     if( key.isWritable() ) 
  52.                     { 
  53.                         send(key); 
  54.                     } 
  55.                 } 
  56.                 catch( IOException e ) 
  57.                 { 
  58.                     e.printStackTrace(); 
  59.                     try 
  60.                     { 
  61.                         if( key != null ) 
  62.                         { 
  63.                             key.cancel(); 
  64.                             key.channel().close(); 
  65.                         } 
  66.                     } 
  67.                     catch( Exception ex ) 
  68.                     { 
  69.                         e.printStackTrace(); 
  70.                     } 
  71.                 } 
  72.             }//#while 
  73.         }//#while 
  74.     } 
  75.     public void send(SelectionKey key) throws IOException 
  76.     { 
  77.         ByteBuffer buffer = (ByteBuffer)key.attachment(); 
  78.         SocketChannel socketChannel = (SocketChannel)key.channel(); 
  79.         buffer.flip(); //把極限設為位置,把位置設為0 
  80.         String data = decode(buffer); 
  81.         if( data.indexOf("/r/n") == -1 ) 
  82.             return; 
  83.         String outputData = data.substring(0, data.indexOf("/n") + 1); 
  84.         System.out.print(outputData); 
  85.         ByteBuffer outputBuffer = encode("echo:" + outputData); 
  86.         //傳送一行字串 
  87.         while( outputBuffer.hasRemaining() ) 
  88.             socketChannel.write(outputBuffer); 
  89.         ByteBuffer temp = encode(outputData); 
  90.         buffer.position(temp.limit()); 
  91.         buffer.compact(); //刪除已經處理的字串 
  92.         if( outputData.equals("bye/r/n") ) 
  93.         { 
  94.             key.cancel(); 
  95.             socketChannel.close(); 
  96.             System.out.println("關閉與客戶的連線"); 
  97.         } 
  98.     } 
  99.     public void receive(SelectionKey key) throws IOException 
  100.     { 
  101.         ByteBuffer buffer = (ByteBuffer)key.attachment(); 
  102.         SocketChannel socketChannel = (SocketChannel)key.channel(); 
  103.         ByteBuffer readBuff = ByteBuffer.allocate(32); 
  104.         socketChannel.read(readBuff); 
  105.         readBuff.flip(); 
  106.         buffer.limit(buffer.capacity()); 
  107.         buffer.put(readBuff); //把讀到的資料放到buffer中 
  108.     } 
  109.     public String decode(ByteBuffer buffer) 
  110.     { //解碼 
  111.         CharBuffer charBuffer = charset.decode(buffer); 
  112.         return charBuffer.toString(); 
  113.     } 
  114.     public ByteBuffer encode(String str) 
  115.     { //編碼 
  116.         return charset.encode(str); 
  117.     } 
  118.     public static void main(String args[]) throws Exception 
  119.     { 
  120.         EchoServer server = new EchoServer(); 
  121.         server.service(); 
  122.     } 

2.在EchoServer中混合用阻塞模式與非阻塞模式

在例程1中,EchoServer的ServerSocketChannel以及SocketChannel都被設定為非阻塞模式,這使得接收連線、接收資料和傳送資料的操作都採用非阻塞模式,EchoServer採用一個執行緒同時完成這些操作。假如有許多客戶請求連線,可以把接收客戶連線的操作單獨由一個執行緒完成,把接收資料和傳送資料的操作由另一個執行緒完成,這可以提高伺服器的併發效能。

負責接收客戶連線的執行緒按照阻塞模式工作,如果收到客戶連線,就向Selector註冊讀就緒和寫就緒事件,否則進入阻塞狀態,直到接收到了客戶的連線。負責接收資料和傳送資料的執行緒按照非阻塞模式工作,只有在讀就緒或寫就緒事件發生時,才執行相應的接收資料和傳送資料操作。

例程2是EchoServer類的源程式。其中receive()、send()、decode()和encode()方法的程式碼與例程1的EchoServer類相同,為了節省篇幅,不再重複顯示。

  1. //例程2  EchoServer.java(混合使用阻塞模式與非阻塞模式) 
  2. import java.io.*; 
  3. import java.nio.*; 
  4. import java.nio.channels.*; 
  5. import java.nio.charset.*; 
  6. import java.net.*; 
  7. import java.util.*; 
  8. public class EchoServer 
  9.     private Selector selector = null; 
  10.     private ServerSocketChannel serverSocketChannel = null; 
  11.     private int port = 8000; 
  12.     private Charset charset = Charset.forName("GBK"); 
  13.     public EchoServer() throws IOException 
  14.     { 
  15.         selector = Selector.open(); 
  16.         serverSocketChannel = ServerSocketChannel.open(); 
  17.         serverSocketChannel.socket().setReuseAddress(true); 
  18.         serverSocketChannel.socket().bind(new InetSocketAddress(port)); 
  19.         System.out.println("伺服器啟動"); 
  20.     } 
  21.     public void accept() 
  22.     { 
  23.         for( ;; ) 
  24.         { 
  25.             try 
  26.             { 
  27.                 SocketChannel socketChannel = serverSocketChannel.accept(); 
  28.                 System.out.println("接收到客戶連線,來自:" + socketChannel.socket().getInetAddress() + ":" 
  29.                         + socketChannel.socket().getPort()); 
  30.                 socketChannel.configureBlocking(false); 
  31.                 ByteBuffer buffer = ByteBuffer.allocate(1024); 
  32.                 synchronized(gate) 
  33.                 { 
  34.                     selector.wakeup(); 
  35.                     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, 
  36.                             buffer); 
  37.                 } 
  38.             } 
  39.             catch( IOException e ) 
  40.             { 
  41.                 e.printStackTrace(); 
  42.             } 
  43.         } 
  44.     } 
  45.     private Object gate = new Object(); 
  46.     public void service() throws IOException 
  47.     { 
  48.         for( ;; ) 
  49.         { 
  50.             synchronized(gate) 
  51.             { 
  52.             } 
  53.             int n = selector.select(); 
  54.             if( n == 0 ) 
  55.                 continue; 
  56.             Set readyKeys = selector.selectedKeys(); 
  57.             Iterator it = readyKeys.iterator(); 
  58.             while( it.hasNext() ) 
  59.             { 
  60.                 SelectionKey key = null; 
  61.                 try 
  62.                 { 
  63.                     key = (SelectionKey)it.next(); 
  64.                     it.remove(); 
  65.                     if( key.isReadable() ) 
  66.                     { 
  67.                         receive(key); 
  68.                     } 
  69.                     if( key.isWritable() ) 
  70.                     { 
  71.                         send(key); 
  72.                     } 
  73.                 } 
  74.                 catch( IOException e ) 
  75.                 { 
  76.                     e.printStackTrace(); 
  77.                     try 
  78.                     { 
  79.                         if( key != null ) 
  80.                         { 
  81.                             key.cancel(); 
  82.                             key.channel().close(); 
  83.                         } 
  84.                     } 
  85.                     catch( Exception ex ) 
  86.                     { 
  87.                         e.printStackTrace(); 
  88.                     } 
  89.                 } 
  90.             }//#while 
  91.         }//#while 
  92.     } 
  93.     public void send(SelectionKey key)throws IOException{…} 
  94.     public void receive(SelectionKey key)throws IOException{…} 
  95.     public String decode(ByteBuffer buffer){…} 
  96.     public ByteBuffer encode(String str){…} 
  97.     public static void main(String args[]) throws Exception 
  98.     { 
  99.         final EchoServer server = new EchoServer(); 
  100.         Thread accept = new Thread() 
  101.         { 
  102.             public void run() 
  103.             { 
  104.                 server.accept(); 
  105.             } 
  106.         }; 
  107.         accept.start(); 
  108.         server.service(); 
  109.     } 

以上EchoServer類的構造方法與例程1的EchoServer類的構造方法基本相同,唯一的區別是,在本例中, ServerSocketChannel採用預設的阻塞模式,即沒有呼叫以下方法:

serverSocketChannel.configureBlocking(false);

EchoServer類的accept()方法負責接收客戶連線,ServerSocketChannel的accept()方法工作於阻塞模式,如果沒有客戶連線,就會進入阻塞狀態,直到接收到了客戶連線。接下來呼叫socketChannel.configureBlocking(false)方法把SocketChannel設為非阻塞模式,然後向Selector註冊讀就緒和寫就緒事件。

EchoServer類的service()方法負責接收和傳送資料,它在一個無限for迴圈中,不斷呼叫Selector的select()方法查尋已經發生的事件,然後作出相應的處理。

在EchoServer類的main()方法中,定義了一個匿名執行緒(暫且稱它為Accept執行緒),它負責執行EchoServer的accept()方法。執行main()方法的主執行緒啟動了Accept執行緒後,主執行緒就開始執行EchoServer的service()方法。因此當EchoServer啟動後,共有兩個執行緒在工作,Accept執行緒負責接收客戶連線,主執行緒負責接收和傳送資料:

  1. public static void main(String args[]) throws Exception 
  2.     final EchoServer server = new EchoServer(); 
  3.     Thread accept = new Thread() 
  4.     { //定義Accept執行緒 
  5.         public void run() 
  6.         { 
  7.             server.accept(); 
  8.         } 
  9.     }; 
  10.     accept.start(); //啟動Accept執行緒 
  11.     server.service(); //主執行緒執行service()方法 

當Accept執行緒開始執行以下方法時:

socketChannel.register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,buffer);

如果主執行緒正好在執行selector.select()方法,而且處於阻塞狀態,那麼Accept執行緒也會進入阻塞狀態。兩個執行緒都處於阻塞狀態,很有可能導致死鎖。導致死鎖的具體情形為:Selector中尚沒有任何註冊的事件,即all-keys集合為空,主執行緒執行selector.select()方法時將進入阻塞狀態,只有Accept執行緒向Selector註冊了事件,並且該事件發生後,主執行緒才會從selector.select()方法中返回。假如Selector中尚沒有任何註冊的事件,此時Accept執行緒呼叫socketChannel.register()方法向Selector註冊事件,由於主執行緒正在selector.select()方法中阻塞,這使得Accept執行緒也在socketChannel.register()方法中阻塞。Accept執行緒無法向Selector註冊事件,而主執行緒沒有任何事件可以監控,所以這兩個執行緒都將永遠阻塞下去。

為了避免死鎖,程式必須保證當Accept執行緒正在通過socketChannel.register()方法向Selector註冊事件時,不允許主執行緒正在selector.select()方法中阻塞。

為了協調Accept執行緒和主執行緒,EchoServer類在以下程式碼前加了同步標記。當Accept執行緒開始執行這段程式碼時,必須先獲得gate物件的同步鎖,然後進入同步程式碼塊,先執行Selector物件的wakeup()方法,假如此時主執行緒正好在執行selector.select()方法,而且處於阻塞狀態,那麼主執行緒就會被喚醒,立即退出selector.select()方法。

  1. synchronized(gate) 
  2. { //Accept執行緒執行這個同步程式碼塊 
  3.     selector.wakeup(); 
  4.     socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE, buffer); 

主執行緒被喚醒後,在下一次迴圈中又會執行selector.select()方法,為了保證讓Accept執行緒先執行完socketChannel.register()方法,再讓主執行緒執行selector.select()方法,主執行緒必須先獲得gate物件的同步鎖:

for(;;){

  //一個空的同步程式碼塊,其作用是為了讓主執行緒等待Accept執行緒執行完同步程式碼塊

  synchronized(gate){}  //主執行緒執行這個同步程式碼塊

int n = selector.select();

}

假如Accept執行緒還沒有執行完同步程式碼塊,就不會釋放gate物件的同步鎖,這使得主執行緒必須等待片刻,等到Accept執行緒執行完同步程式碼塊,釋放了gate物件的同步鎖,主執行緒才能恢復執行,再次執行selector.select()方法。

3.建立非阻塞的EchoClient

對於客戶與伺服器之間的通訊,按照它們收發資料的協調程度來區分,可分為同步通訊和非同步通訊。同步通訊是指甲方向乙方傳送了一批資料後,必須等接收到了乙方的響應資料後,再發送下一批資料。非同步通訊是指傳送資料和接收資料的操作互不干擾,各自獨立進行。值得注意的是,通訊的兩端並不要求都採用同樣的通訊方式,一方採用同步通訊方式時,另一方可以採用非同步通訊方式。

同步通訊要求一個I/O操作完成之後,才能完成下一個I/O操作,用阻塞模式更容易實現它。非同步通訊允許傳送資料和接收資料的操作各自獨立進行,用非阻塞模式更容易實現它。例程1和例程2介紹的EchoServer都採用非同步通訊,每次接收資料時,能讀到多少資料,就讀多少資料,並不要求必須讀到一行資料後,才能執行傳送資料的操作。

例程3的EchoClient類利用非阻塞模式來實現非同步通訊。在EchoClient類中,定義了兩個ByteBuffer:sendBuffer和receiveBuffer。EchoClient把使用者向控制檯輸入的資料存放到sendBuffer中,並且把sendBuffer中的資料傳送給遠端伺服器;EchoClient把從遠端伺服器接收到的資料存放在receiveBuffer中,並且把receiveBuffer中的資料列印到控制檯。圖6顯示了這兩個Buffer的作用。

圖6  sendBuffer和receiveBuffer的作用

  1. //例程3  EchoClient.java(非阻塞模式) 
  2. import java.net.*; 
  3. import java.nio.channels.*; 
  4. import java.nio.*; 
  5. import java.io.*; 
  6. import java.nio.charset.*; 
  7. import java.util.*; 
  8. public class EchoClient 
  9.     private SocketChannel socketChannel = null; 
  10.     private ByteBuffer sendBuffer = ByteBuffer.allocate(1024); 
  11.     private ByteBuffer receiveBuffer = ByteBuffer.allocate(1024); 
  12.     private Charset charset = Charset.forName("GBK"); 
  13.     private Selector selector; 
  14.     public EchoClient() throws IOException 
  15.     { 
  16.         socketChannel = SocketChannel.open(); 
  17.         InetAddress ia = InetAddress.getLocalHost(); 
  18.         InetSocketAddress isa = new InetSocketAddress(ia, 8000); 
  19.         socketChannel.connect(isa); //採用阻塞模式連線伺服器 
  20.         socketChannel.configureBlocking(false); //設定為非阻塞模式 
  21.         System.out.println("與伺服器的連線建立成功"); 
  22.         selector = Selector.open(); 
  23.     } 
  24.     public static void main(String args[]) throws IOException 
  25.     { 
  26.         final EchoClient client = new EchoClient(); 
  27.         Thread receiver = new Thread() 
  28.         { //建立Receiver執行緒 
  29.             public void run() 
  30.             { 
  31.                 client.receiveFromUser(); //接收使用者向控制檯輸入的資料 
  32.             } 
  33.         }; 
  34.         receiver.start(); //啟動Receiver執行緒 
  35.         client.talk(); 
  36.     } 
  37.     public void receiveFromUser() 
  38.     { //接收使用者從控制檯輸入的資料,把它放到sendBuffer中 
  39.         try 
  40.         { 
  41.             BufferedReader localReader = new BufferedReader(new InputStreamReader(System.in)); 
  42.             String msg = null; 
  43.             while( (msg = localReader.readLine()) != null ) 
  44.             { 
  45.                 synchronized(sendBuffer) 
  46.                 { 
  47.                     sendBuffer.put(encode(msg + "/r/n")); 
  48.                 } 
  49.                 if( msg.equals("bye") ) 
  50.                     break; 
  51.             } 
  52.         } 
  53.         catch( IOException e ) 
  54.         { 
  55.             e.printStackTrace(); 
  56.         } 
  57.     } 
  58.     public void talk() throws IOException 
  59.     { //接收和傳送資料 
  60.         socketChannel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE); 
  61.         while( selector.select() > 0 ) 
  62.         { 
  63.             Set readyKeys = selector.selectedKeys(); 
  64.             Iterator it = readyKeys.iterator(); 
  65.             while( it.hasNext() ) 
  66.             { 
  67.                 SelectionKey key = null; 
  68.                 try 
  69.                 { 
  70.                     key = (SelectionKey)it.next(); 
  71.                     it.remove(); 
  72.                     if( key.isReadable() ) 
  73.                     { 
  74.                         receive(key); 
  75.                     } 
  76.                     if( key.isWritable() ) 
  77.                     { 
  78.                         send(key); 
  79.                     } 
  80.                 } 
  81.                 catch( IOException e ) 
  82.                 { 
  83.