1. 程式人生 > >【Java TCP/IP Socket】基於NIO的TCP通訊(含程式碼)

【Java TCP/IP Socket】基於NIO的TCP通訊(含程式碼)

   NIO主要原理及使用

NIO採取通道(Channel)和緩衝區(Buffer)來傳輸和儲存資料,它是非阻塞式的I/O,即在等待連線、讀寫資料(這些都是在一執行緒以客戶端的程式中會阻塞執行緒的操作)的時候,程式也可以做其他事情,以實現執行緒的非同步操作。

   考慮一個即時訊息伺服器,可能有上千個客戶端同時連線到伺服器,但是在任何時刻只有非常少量的訊息需要讀取和分發(如果採用執行緒池或者一執行緒一客戶端方式,則會非常浪費資源),這就需要一種方法能阻塞等待,直到有一個通道可以進行I/O操作。NIO的Selector選擇器就實現了這樣的功能,一個Selector例項可以同時檢查一組通道的I/O狀態,它就

類似一個觀察者,只要我們把需要探知的SocketChannel告訴Selector,我們接著做別的事情,當有事件(比如,連線開啟、資料到達等)發生時,它會通知我們,傳回一組SelectionKey,我們讀取這些Key,就會獲得我們剛剛註冊過的SocketChannel,然後,我們從這個Channel中讀取資料,接著我們可以處理這些資料。

    Selector內部原理實際是在做一個對所註冊的Channel的輪詢訪問,不斷的輪詢(目前就這一個演算法),一旦輪詢到一個Channel有所註冊的事情發生,比如資料來了,它就會讀取Channel中的資料,並對其進行處理。

    要使用選擇器,需要建立一個Selector例項,並將其註冊到想要監控的通道上(通過Channel的方法實現)。最後呼叫選擇器的select()方法,該方法會阻塞等待,直到有一個或多個通道準備好了I/O操作或等待超時,或另一個執行緒呼叫了該選擇器的wakeup()方法。現在,在一個單獨的執行緒中,通過呼叫select()方法,就能檢查多個通道是否準備好進行I/O操作,由於非阻塞I/O的非同步特性,在檢查的同時,我們也可以執行其他任務。

   基於NIO的TCP連線的建立步驟

  服務端

    1、傳建一個Selector例項;

    2、將其註冊到各種通道,並指定每個通道上感興趣的I/O操作;

    3、重複執行:

        1)呼叫一種select()方法;

        2)獲取選取的鍵列表;

        3)對於已選鍵集中的每個鍵:

           a、獲取通道,並從鍵中獲取附件(如果為通道及其相關的key添加了附件的話);

           b、確定準備就緒的操縱並執行,如果是accept操作,將接收的通道設定為非阻塞模式,並註冊到選擇器;

           c、如果需要,修改鍵的興趣操作集;

           d、從已選鍵集中移除鍵

     客戶端

   與基於多執行緒的TCP客戶端大致相同,只是這裡是通過通道建立的連線,但在等待連線建立及讀寫時,我們可以非同步地執行其他任務。

   基於NIO的TCP通訊Demo

    下面給出一個基於NIO的TCP通訊的Demo,客戶端傳送一串字串到服務端,服務端將該字串原原本本地反饋給客戶端。

    客戶端程式碼及其詳細註釋如下:

import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class TCPEchoClientNonblocking {
	public static void main(String args[]) throws Exception{
		if ((args.length < 2) || (args.length > 3)) 
		throw new IllegalArgumentException("引數不正確");
		//第一個引數作為要連線的服務端的主機名或IP
		String server = args[0]; 
		//第二個引數為要傳送到服務端的字串
		byte[] argument = args[1].getBytes();
		//如果有第三個引數,則作為埠號,如果沒有,則埠號設為7
		int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7;
		//建立一個通道,並設為非阻塞模式
		SocketChannel clntChan = SocketChannel.open();
		clntChan.configureBlocking(false);
		//向服務端發起連線
		if (!clntChan.connect(new InetSocketAddress(server, servPort))){
			//不斷地輪詢連線狀態,直到完成連線
			while (!clntChan.finishConnect()){
				//在等待連線的時間裡,可以執行其他任務,以充分發揮非阻塞IO的非同步特性
				//這裡為了演示該方法的使用,只是一直列印"."
				System.out.print(".");  
			}
		}
		//為了與後面列印的"."區別開來,這裡輸出換行符
		System.out.print("\n");
		//分別例項化用來讀寫的緩衝區
		ByteBuffer writeBuf = ByteBuffer.wrap(argument);
		ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
		//接收到的總的位元組數
		int totalBytesRcvd = 0; 
		//每一次呼叫read()方法接收到的位元組數
		int bytesRcvd; 
		//迴圈執行,直到接收到的位元組數與傳送的字串的位元組數相等
		while (totalBytesRcvd < argument.length){
			//如果用來向通道中寫資料的緩衝區中還有剩餘的位元組,則繼續將資料寫入通道
			if (writeBuf.hasRemaining()){
				clntChan.write(writeBuf);
			}
			//如果read()接收到-1,表明服務端關閉,丟擲異常
			if ((bytesRcvd = clntChan.read(readBuf)) == -1){
				throw new SocketException("Connection closed prematurely");
			}
			//計算接收到的總位元組數
			totalBytesRcvd += bytesRcvd;
			//在等待通訊完成的過程中,程式可以執行其他任務,以體現非阻塞IO的非同步特性
			//這裡為了演示該方法的使用,同樣只是一直列印"."
			System.out.print("."); 
		}
		//打印出接收到的資料
		System.out.println("Received: " +  new String(readBuf.array(), 0, totalBytesRcvd));
		//關閉通道
		clntChan.close();
	}
}
    服務端用單個執行緒監控一組通道,程式碼如下:
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

public class TCPServerSelector{
	//緩衝區的長度
	private static final int BUFSIZE = 256; 
	//select方法等待通道準備好的最長時間
	private static final int TIMEOUT = 3000; 
	public static void main(String[] args) throws IOException {
		if (args.length < 1){
			throw new IllegalArgumentException("Parameter(s): <Port> ...");
		}
		//建立一個選擇器
		Selector selector = Selector.open();
		for (String arg : args){
			//例項化一個通道
			ServerSocketChannel listnChannel = ServerSocketChannel.open();
			//將該通道繫結到指定埠
			listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));
			//配置通道為非阻塞模式
			listnChannel.configureBlocking(false);
			//將選擇器註冊到各個通道
			listnChannel.register(selector, SelectionKey.OP_ACCEPT);
		}
		//建立一個實現了協議介面的物件
		TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
		//不斷輪詢select方法,獲取準備好的通道所關聯的Key集
		while (true){
			//一直等待,直至有通道準備好了I/O操作
			if (selector.select(TIMEOUT) == 0){
				//在等待通道準備的同時,也可以非同步地執行其他任務,
				//這裡只是簡單地列印"."
				System.out.print(".");
				continue;
			}
			//獲取準備好的通道所關聯的Key集合的iterator例項
			Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
			//迴圈取得集合中的每個鍵值
			while (keyIter.hasNext()){
				SelectionKey key = keyIter.next(); 
				//如果服務端通道感興趣的I/O操作為accept
				if (key.isAcceptable()){
					protocol.handleAccept(key);
				}
				//如果客戶端通道感興趣的I/O操作為read
				if (key.isReadable()){
					protocol.handleRead(key);
				}
				//如果該鍵值有效,並且其對應的客戶端通道感興趣的I/O操作為write
				if (key.isValid() && key.isWritable()) {
					protocol.handleWrite(key);
				}
				//這裡需要手動從鍵集中移除當前的key
				keyIter.remove(); 
			}
		}
	}
}

    這裡為了使不同協議都能方便地使用這個基本的服務模式,我們把通道中與具體協議相關的處理各種I/O的操作分離了出來,定義了一個介面,如下:
import java.nio.channels.SelectionKey;
import java.io.IOException;

/**
*該介面定義了通用TCPSelectorServer類與特定協議之間的介面,
*它把與具體協議相關的處理各種I/O的操作分離了出來,
*以使不同協議都能方便地使用這個基本的服務模式。
*/
public interface TCPProtocol{
	//accept I/O形式
	void handleAccept(SelectionKey key) throws IOException;
	//read I/O形式
	void handleRead(SelectionKey key) throws IOException;
	//write I/O形式
	void handleWrite(SelectionKey key) throws IOException;
}
    介面的實現類程式碼如下:
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.ServerSocketChannel;
import java.nio.ByteBuffer;
import java.io.IOException;

public class EchoSelectorProtocol implements TCPProtocol {
	private int bufSize; // 緩衝區的長度
	public EchoSelectorProtocol(int bufSize){
    this.bufSize = bufSize;
	}

	//服務端通道已經準備好了接收新的客戶端連線
	public void handleAccept(SelectionKey key) throws IOException {
		SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
		clntChan.configureBlocking(false);
		//將選擇器註冊到連線到的客戶端通道,並指定該通道key值的屬性為OP_READ,同時為該通道指定關聯的附件
		clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
	}

	//客戶端通道已經準備好了從通道中讀取資料到緩衝區
	public void handleRead(SelectionKey key) throws IOException{
		SocketChannel clntChan = (SocketChannel) key.channel();
		//獲取該通道所關聯的附件,這裡為緩衝區
		ByteBuffer buf = (ByteBuffer) key.attachment();
		long bytesRead = clntChan.read(buf);
		//如果read()方法返回-1,說明客戶端關閉了連線,那麼客戶端已經接收到了與自己傳送位元組數相等的資料,可以安全地關閉
		if (bytesRead == -1){ 
			clntChan.close();
		}else if(bytesRead > 0){
		//如果緩衝區總讀入了資料,則將該通道感興趣的操作設定為為可讀可寫
		key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
		}
	}
	
	//客戶端通道已經準備好了將資料從緩衝區寫入通道
	public void handleWrite(SelectionKey key) throws IOException {
    //獲取與該通道關聯的緩衝區,裡面有之前讀取到的資料
    ByteBuffer buf = (ByteBuffer) key.attachment();
	//重置緩衝區,準備將資料寫入通道
    buf.flip(); 
    SocketChannel clntChan = (SocketChannel) key.channel();
	//將資料寫入到通道中
    clntChan.write(buf);
    if (!buf.hasRemaining()){ 
	//如果緩衝區中的資料已經全部寫入了通道,則將該通道感興趣的操作設定為可讀
      key.interestOps(SelectionKey.OP_READ);
    }
	//為讀入更多的資料騰出空間
    buf.compact(); 
  }

}

    執行結果如下:

   說明:以上的服務端程式,select()方法第一次能選擇出來的準備好的通道都是服務端通道,其關聯鍵值的屬性都為OP_ACCEPT,亦及有效操作都為accept,在執行handleAccept方法時,為取得連線的客戶端通道也進行了註冊,屬性為OP_READ,這樣下次輪詢呼叫select()方法時,便會檢查到對read操作感興趣的客戶端通道(當然也有可能有關聯accept操作興趣集的通道),從而呼叫handleRead方法,在該方法中又註冊了OP_WRITE屬性,那麼第三次呼叫select()方法時,便會檢測到對write操作感興趣的客戶端通道(當然也有可能有關聯read操作興趣集的通道),從而呼叫handleWrite方法。    結果:從結果中很明顯地可以看出,伺服器端在等待通道準備好的時候,執行緒沒有阻塞,而是可以執行其他任務,這裡只是簡單的列印".",客戶端在等待連線和等待資料讀寫完成的時候,執行緒沒有阻塞,也可以執行其他任務,這裡也正是簡單的列印"."。

   幾個需要注意的地方

    1、對於非阻塞SocketChannel來說,一旦已經呼叫connect()方法發起連線,底層套接字可能既不是已經連線,也不是沒有連線,而是正在連線。由於底層協議的工作機制,套接字可能會在這個狀態一直保持下去,這時候就需要迴圈地呼叫finishConnect()方法來檢查是否完成連線,在等待連線的同時,執行緒也可以做其他事情,這便實現了執行緒的非同步操作。     2、write()方法的非阻塞呼叫哦只會寫出其能夠傳送的資料,而不會阻塞等待所有資料,而後一起傳送,因此在呼叫write()方法將資料寫入通道時,一般要用到while迴圈,如: while(buf.hasRemaining())     channel.write(buf);     3、任何對key(通道)所關聯的興趣操作集的改變,都只在下次呼叫了select()方法後才會生效。     4、selectedKeys()方法返回的鍵集是可修改的,實際上在兩次呼叫select()方法之間,都必須手動將其清空,否則,它就會在下次呼叫select()方法時仍然保留在集合中,而且可能會有無用的操作來呼叫它,換句話說,select()方法只會在已有的所選鍵集上新增鍵,它們不會建立新的建集。     5、對於ServerSocketChannel來說,accept是唯一的有效操作,而對於SocketChannel來說,有效操作包括讀、寫和連線,另外,對於DatagramChannle,只有讀寫操作是有效的。