1. 程式人生 > >javaNIO原理(含程式碼)及與 同步阻塞IO 、偽非同步IO比較

javaNIO原理(含程式碼)及與 同步阻塞IO 、偽非同步IO比較

一.同步阻塞IO

        BIO就是阻塞式的IO,網路通訊中對於多客戶端的連入,伺服器端總是與客戶端數量一致的執行緒去處理每個客戶端任務,即,客戶端與執行緒數1:1,並且進行讀寫操作室阻塞的,當有你成千上完的客戶端進行連線,就導致伺服器不斷的建立新的執行緒,最後導致低通資源不足,後面的客戶端不能連線伺服器,並且連線入的客戶端並不是總是在於伺服器進行互動,很可能就只是佔用著資源而已。

二.偽非同步IO

   偽非同步IO對同步IO進行了優化,後端通過一個執行緒池和任務佇列去處理所有客戶端的請求,當用完後在歸還給執行緒池,執行緒池的原理和資料庫連線池的原理很像,他減少了執行緒建立和銷燬的時間,無論多少客戶端的連線都是這些固定的執行緒數量去處理,這在大量客戶端與伺服器資訊互動量很少的情況下,是一種很好的處理方式,但是想一種情況,當有一個客戶端的讀取資訊非常慢時,伺服器對其的寫操作時會很慢,甚至會阻塞很長時間,因為執行緒池中的執行緒是有限的,當有客戶端需要分配執行緒時,就會導致新任務在佇列中一直等待阻塞的客戶端釋放執行緒。當任務佇列已經滿時,就會有大量的使用者發生連線超時。其實,偽非同步IO也是同步阻塞IO。

.javaNIO原理

       為了解決上面的兩種阻塞IO的缺陷,java在1.4版本開始加入NIO也稱為new IO,至於他是什麼模型我們先來看這個部落格。看完之後就會很清楚了。

目前在UNP劃分的角度,非同步IO只有AIO。網路通訊中,NIO也提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道來實現,可以設定阻塞餘非阻塞兩種模式,為了實現高負載高併發都採取非阻塞的模式。NIO採用緩衝區BUFFER,實現對資料的讀寫操作,緩衝區是固定大小,並由內部狀態記錄有多少資料被放入或者取出。與阻塞IO不同,阻塞IO採用阻塞式流(Stream)的方式進行讀寫,流是單向的只能向一個方向讀資料或者寫資料,而通道是雙向的,可以同時在通道上傳送和讀取資料,而且是非阻塞的,在沒有資料可讀可寫時可以去做別的事情。

       NIO改進了上面的一對一或者M:N的模型。伺服器僅採用一個一個執行緒去處理所有客戶端執行緒,這就需要建立一個selector,並將其註冊到想要監控的通道上(注意是通過channel的方法來實現),並返回一個selectorKey例項(包含通道和select以及感興趣的操作),selector就好像是一個觀察者,通過不斷輪詢所註冊的一組通道上有沒有等待的操作發生,當等待事件發生的時候可以做其他事情,當有信道出現感興趣的操作,則該通道就進入就緒狀態。

      Slector的select方法阻塞等待又沒有就緒的通道,當出現就緒的通道或者等待超時返回,就緒通道的個數,若等待超時則返回-1,selectedKeys方法返回就緒的通道。

   下面附程式碼

這是處理感興趣通道的介面,因為他可以放在多個伺服器上所以把他做成了介面。

package Nio;

import java.io.IOException;
import java.nio.channels.SelectionKey;

public interface TCPProtocol {
	void handleAccept(SelectionKey key) throws IOException;
	void handleRead(SelectionKey key) throws IOException;
	void handleWrite(SelectionKey key) throws IOException;
}
<span style="font-family:FangSong_GB2312;">這是對上面介面的具體實現</span>
package Nio;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

public class SelectorProtocol implements TCPProtocol {
	private int bufSize ;
	public SelectorProtocol(int buffsize){
		this.bufSize = buffsize;
	}

	 //服務端通道已經準備好了接收新的客戶端連線  
    public void handleAccept(SelectionKey key) throws IOException {  
        SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();  
        clntChan.configureBlocking(false);  
        //將選擇器註冊到連線到的客戶端通道,並指定該通道key值的屬性為OP_READ,同時為該通道指定關聯的附件  
        clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));  
    }  
  
    //客戶端通道已經準備好了從通道中讀取資料到緩衝區  
    public void handleRead(SelectionKey key) throws IOException{  
        SocketChannel clntChan = (SocketChannel) key.channel();  
        //獲取該通道所關聯的附件,這裡為緩衝區  
        ByteBuffer buf = (ByteBuffer) key.attachment();  
        long bytesRead = clntChan.read(buf);  
        //如果read()方法返回-1,說明客戶端關閉了連線,那麼客戶端已經接收到了與自己傳送位元組數相等的資料,可以安全地關閉  
        if (bytesRead == -1){   
            clntChan.close();  
        }else if(bytesRead > 0){  
        //如果緩衝區總讀入了資料,則將該通道感興趣的操作設定為為可讀可寫  
        key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);  
        }  
    }  
      
    //客戶端通道已經準備好了將資料從緩衝區寫入通道  
    public void handleWrite(SelectionKey key) throws IOException {  
    //獲取與該通道關聯的緩衝區,裡面有之前讀取到的資料  
    ByteBuffer buf = (ByteBuffer) key.attachment();  
    //重置緩衝區,準備將資料寫入通道  
    buf.flip();   
    SocketChannel clntChan = (SocketChannel) key.channel();  
    //將資料寫入到通道中  
    clntChan.write(buf);  
    if (!buf.hasRemaining()){   
    //如果緩衝區中的資料已經全部寫入了通道,則將該通道感興趣的操作設定為可讀  
      key.interestOps(SelectionKey.OP_READ);  
    }  
    //為讀入更多的資料騰出空間  
    buf.compact();   
  }  
}
客戶端
package Nio;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;

public class TCPEchoClientNonblocking {
	public static void main(String args[]) throws Exception{
		//第一個引數作為要連線的服務端的主機名或IP
		String server = "localhost"; 
		//第二個引數為要傳送到服務端的字串
		byte[] argument = "nihaopengyou".getBytes();
		//如果有第三個引數,則作為埠號,如果沒有,則埠號設為7
		int servPort = 2002;
		//建立一個通道,並設為非阻塞模式
		SocketChannel clntChan = SocketChannel.open();
		clntChan.configureBlocking(false);
		//向服務端發起連線
		if (!clntChan.connect(new InetSocketAddress(server, servPort))){
			//不斷地輪詢連線狀態,直到完成連線
			while (!clntChan.finishConnect()){
				//在等待連線的時間裡,可以執行其他任務,以充分發揮非阻塞IO的非同步特性
				//這裡為了演示該方法的使用,只是一直列印"."
				System.out.print(".");  
			}
		}
		//為了與後面列印的"."區別開來,這裡輸出換行符
		System.out.print("\n");
		//分別例項化用來讀寫的緩衝區
		ByteBuffer writeBuf = ByteBuffer.wrap(argument);
		ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
		//接收到的總的位元組數
		int totalBytesRcvd = 0; 
		//每一次呼叫read()方法接收到的位元組數
		int bytesRcvd; 
		//迴圈執行,直到接收到的位元組數與傳送的字串的位元組數相等
		while (totalBytesRcvd < argument.length){
			//如果用來向通道中寫資料的緩衝區中還有剩餘的位元組,則繼續將資料寫入通道
			if (writeBuf.hasRemaining()){
				clntChan.write(writeBuf);
			}
			//如果read()接收到-1,表明服務端關閉,丟擲異常
			if ((bytesRcvd = clntChan.read(readBuf)) == -1){
				throw new SocketException("Connection closed prematurely");
			}
			//計算接收到的總位元組數
			totalBytesRcvd += bytesRcvd;
			//在等待通訊完成的過程中,程式可以執行其他任務,以體現非阻塞IO的非同步特性
			//這裡為了演示該方法的使用,同樣只是一直列印"."
			System.out.print("."); 
		}
		//打印出接收到的資料
		System.out.println("Received: " +  new String(readBuf.array(), 0, totalBytesRcvd));
		//關閉通道
		clntChan.close();
	}
}

伺服器
package Nio;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.util.Iterator;

public class TCPServerSelector{
	//緩衝區的長度
	private static final int BUFSIZE = 256; 
	//select方法等待通道準備好的最長時間
	private static final int TIMEOUT = 3000; 
	public static void main(String[] args) throws IOException {
		if (args.length < 1){
			throw new IllegalArgumentException("Parameter(s): <Port> ...");
		}
		//建立一個選擇器
		Selector selector = Selector.open();
		for (String arg : args){
			//例項化一個通道
			ServerSocketChannel listnChannel = ServerSocketChannel.open();
			//將該通道繫結到指定埠
			listnChannel.socket().bind(new InetSocketAddress(Integer.parseInt(arg)));
			System.out.println("啟動伺服器"+Integer.parseInt(arg));
			//配置通道為非阻塞模式
			listnChannel.configureBlocking(false);
			//將選擇器註冊到各個通道
			listnChannel.register(selector, SelectionKey.OP_ACCEPT);
		}
		//建立一個實現了協議介面的物件
		TCPProtocol protocol = new SelectorProtocol(BUFSIZE);
		//不斷輪詢select方法,獲取準備好的通道所關聯的Key集
		while (true){
			//一直等待,直至有通道準備好了I/O操作
			if (selector.select(TIMEOUT) == 0){
				//在等待通道準備的同時,也可以非同步地執行其他任務,
				//這裡只是簡單地列印"."
				System.out.print(".");
				continue;
			}
			//獲取準備好的通道所關聯的Key集合的iterator例項
			Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
			//迴圈取得集合中的每個鍵值
			while (keyIter.hasNext()){
				SelectionKey key = keyIter.next(); 
				//如果服務端通道感興趣的I/O操作為accept
				if (key.isAcceptable()){
					protocol.handleAccept(key);
				}
				//如果客戶端通道感興趣的I/O操作為read
				if (key.isReadable()){
					protocol.handleRead(key);
				}
				//如果該鍵值有效,並且其對應的客戶端通道感興趣的I/O操作為write
				if (key.isValid() && key.isWritable()) {
					protocol.handleWrite(key);
				}
				//這裡需要手動從鍵集中移除當前的key
				keyIter.remove(); 
			}
		}
	}
}

執行結果: