1. 程式人生 > >Java NIO怎麼理解通道和非阻塞?

Java NIO怎麼理解通道和非阻塞?

nio引入了buffer、channel、selector等概念。

通道相當於之前的I/O流。

“通道”太抽象了。java解釋不清的東西只能看它底層是怎麼解釋的——作業系統的I/O控制,通道控制方式?

I/O裝置:CPU——通道——裝置控制器——I/O裝置

(通道和裝置控制器的關係是多對多,裝置控制器和I/O裝置的關係也是多對多。)

1.CPU在執行使用者程式時遇到I/O請求,根據使用者的I/O請求生成通道程式(也可以是事先編好的)。放到記憶體中,並把該通道程式首地址放入CAW中。
2.CPU執行“啟動I/O”指令,啟動通道工作。

3.通道接收“啟動I/O”指令訊號,從CAW(記錄下一條通道指令存放的地址)中取出通道程式首地址,並根據此地址取出通道程式的第一條指令,放入CCW(記錄正在執行的通道指令)中;同時向CPU發回答訊號,通知“啟動I/O”指令完成完畢,CPU可繼續執行。

4.與此同時,通道開始執行通道程式,進行物理I/O操作。當執行完一條指令後,如果還有下一條指令則繼續執行;否則表示傳輸完成,同時自行停止,通知CPU轉去處理通道結束事件,並從CCW中得到有關通道狀態。

如此一來,主處理器只要發出一個I/O操作命令,剩下的工作完全由通道負責。I/O操作結束後,I/O通道會發出一箇中斷請求,表示相應操作已完成。

通道控制方式是對資料塊進行處理的,並非位元組。

I/O分兩段:1.資料從I/O裝置到核心緩衝區。2.資料從核心緩衝區到應用緩衝區


I/O型別:

1.非同步I/O不會產生阻塞,程式不會等待I/O完成,繼續執行程式碼,等I/O完成了再執行一個什麼回撥函式,程式碼執行效率高。很容易聯想到ajax。這個一般用於I/O操作不影響之後的程式碼執行。

2.阻塞I/O,程式發起I/O操作後,程序阻塞,CPU轉而執行其他程序,I/O的兩個步驟完成後,向CPU傳送中斷訊號,程序就緒,等待執行。

3.非阻塞I/O並非都不阻塞,其實是第一步不阻塞,第二部阻塞。程式發起I/O操作後,程序一直檢查第一步是否完成,CPU一直在迴圈詢問,完成後,程序阻塞直到完成第二步。明白了!這個是“站著茅坑不拉屎”,CPU利用率最低的。邏輯和作業系統的程式直接控制方式一樣。

阻塞不阻塞描述的是發生I/O時當前執行緒的狀態。

以上是作業系統的I/O,那麼java的nio又是怎樣的呢?

個人覺得是模仿了通道控制方式。

先看看nio的示例程式碼:

服務端TestReadServer.java

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TestReadServer {
	
	/*標識數字*/
	private  int flag = 0;
	/*緩衝區大小*/
	private  int BLOCK = 1024*1024*10;
	/*接受資料緩衝區*/
	private  ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
	/*傳送資料緩衝區*/
	private  ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
	private  Selector selector;

	public TestReadServer(int port) throws IOException {
		// 開啟伺服器套接字通道
		ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
		// 伺服器配置為非阻塞
		serverSocketChannel.configureBlocking(false);
		// 檢索與此通道關聯的伺服器套接字
		ServerSocket serverSocket = serverSocketChannel.socket();
		// 進行服務的繫結
		serverSocket.bind(new InetSocketAddress(port));
		// 通過open()方法找到Selector
		selector = Selector.open();
		// 註冊到selector,等待連線
		serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		System.out.println("Server Start----"+port+":");
	}


	// 監聽
	private void listen() throws IOException {
		while (true) {
			// 選擇一組鍵,並且相應的通道已經開啟
			selector.select();
			// 返回此選擇器的已選擇鍵集。
			Set<SelectionKey> selectionKeys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = selectionKeys.iterator();
			while (iterator.hasNext()) {		
				SelectionKey selectionKey = iterator.next();
				iterator.remove();
				handleKey(selectionKey);
			}
		}
	}

	// 處理請求
	private void handleKey(SelectionKey selectionKey) throws IOException {
		// 接受請求
		ServerSocketChannel server = null;
		SocketChannel client = null;
		String receiveText;
		String sendText;
		int count=0;
		// 測試此鍵的通道是否已準備好接受新的套接字連線。
		if (selectionKey.isAcceptable()) {
			// 返回為之建立此鍵的通道。
			server = (ServerSocketChannel) selectionKey.channel();
			// 接受到此通道套接字的連線。
			// 此方法返回的套接字通道(如果有)將處於阻塞模式。
			client = server.accept();
			// 配置為非阻塞
			client.configureBlocking(false);
			// 註冊到selector,等待連線
			client.register(selector, SelectionKey.OP_READ);
		} else if (selectionKey.isReadable()) {
			// 返回為之建立此鍵的通道。
			client = (SocketChannel) selectionKey.channel();
			//將緩衝區清空以備下次讀取
			receivebuffer.clear();
			//讀取伺服器傳送來的資料到緩衝區中
			System.out.println(System.currentTimeMillis());
			count = client.read(receivebuffer);	
			System.out.println(System.currentTimeMillis() + "~"+count);
		} 
	}

	/**
	 * @param args
	 * @throws IOException
	 */
	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		int port = 1234;
		TestReadServer server = new TestReadServer(port);
		server.listen();
	}
}
客戶端TestReadClient.java
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class TestReadClient {

	/*標識數字*/
	private static int flag = 0;
	/*緩衝區大小*/
	private static int BLOCK = 1024*1024*10;
	/*接受資料緩衝區*/
	private static ByteBuffer sendbuffer = ByteBuffer.allocate(BLOCK);
	/*傳送資料緩衝區*/
	private static ByteBuffer receivebuffer = ByteBuffer.allocate(BLOCK);
	/*伺服器端地址*/
	private final static InetSocketAddress SERVER_ADDRESS = new InetSocketAddress(
			"localhost", 1234);

	public static void main(String[] args) throws IOException {
		// TODO Auto-generated method stub
		// 開啟socket通道
		SocketChannel socketChannel = SocketChannel.open();
		// 設定為非阻塞方式
		socketChannel.configureBlocking(false);
		// 開啟選擇器
		Selector selector = Selector.open();
		// 註冊連線服務端socket動作
		socketChannel.register(selector, SelectionKey.OP_CONNECT);
		// 連線
		socketChannel.connect(SERVER_ADDRESS);
		// 分配緩衝區大小記憶體
		
		Set<SelectionKey> selectionKeys;
		Iterator<SelectionKey> iterator;
		SelectionKey selectionKey;
		SocketChannel client;
		String receiveText;
		String sendText;
		int count=0;

		while (true) {
			//選擇一組鍵,其相應的通道已為 I/O 操作準備就緒。
			//此方法執行處於阻塞模式的選擇操作。
			selector.select();
			//返回此選擇器的已選擇鍵集。
			selectionKeys = selector.selectedKeys();
			//System.out.println(selectionKeys.size());
			iterator = selectionKeys.iterator();
			while (iterator.hasNext()) {
				selectionKey = iterator.next();
				if (selectionKey.isConnectable()) {
					System.out.println("client connect");
					client = (SocketChannel) selectionKey.channel();
					// 判斷此通道上是否正在進行連線操作。
					// 完成套接字通道的連線過程。
					if (client.isConnectionPending()) {
						client.finishConnect();
						System.out.println("完成連線!");
						sendbuffer.clear();
						BufferedInputStream br = new BufferedInputStream(new FileInputStream(new File("D:\\BigData.zip")));
						byte[] b = new byte[BLOCK];
						br.read(b);		
						sendbuffer.put(b);
						sendbuffer.flip();
						System.out.println(System.currentTimeMillis());
						client.write(sendbuffer);
						System.out.println(System.currentTimeMillis());
					}
					client.register(selector, SelectionKey.OP_READ);
				} else if (selectionKey.isReadable()) {
					client = (SocketChannel) selectionKey.channel();
					//將緩衝區清空以備下次讀取
					receivebuffer.clear();
					//讀取伺服器傳送來的資料到緩衝區中
					count=client.read(receivebuffer);
					if(count>0){
						receiveText = new String( receivebuffer.array(),0,count);
						System.out.println("客戶端接受伺服器端資料--:"+receiveText);
						client.register(selector, SelectionKey.OP_WRITE);
					}
				} 
			}
			selectionKeys.clear();
		}
	}
}
例子是TestReadClient向TestReadServer傳送一個本地檔案。TestReadServer收到後每次列印讀取到的位元組數。

如何體現非同步I/O?

看看TestReadClient中的:

				if (selectionKey.isConnectable()) {
					System.out.println("client connect");
					client = (SocketChannel) selectionKey.channel();
					// 判斷此通道上是否正在進行連線操作。
					// 完成套接字通道的連線過程。
					if (client.isConnectionPending()) {
						client.finishConnect();
如果沒有client.finishConnect();這句等待完成socket連線,可能會報異常:java.nio.channels.NotYetConnectedException

非同步的才不會管你有沒有連線成功,都會執行下面的程式碼。這裡需要人為的干預。

如果要證明是java的nio單獨使用非阻塞I/O,真沒辦法!!!阻塞非阻塞要檢視程序。。。

不過還有種說法,叫非同步非阻塞。上面那段,是用非同步方式建立連線,程序當然沒有被阻塞。使用了finishConnect()這是人為將程式中止,等待連線建立完成(是模仿阻塞將當前程序阻塞掉,還是模仿非阻塞不斷輪詢訪問,不重要了反正是程式卡住沒往下執行)。

所以,建立連線的過程用非同步非阻塞I/O可以解釋的通。那read/write的過程呢?

根據上面例子的列印結果,可以知道這個過程是同步的,沒執行完是不會執行下面的程式碼的。至於底下是使用阻塞I/O還是非阻塞I/O,對於應用級程式來說不重要了。

阻塞還是非阻塞,對於正常的開發(創立連線,從連線中讀寫資料)並沒有多少的提升,操作過程都類似。

那NIO憑什麼成為高效能架構的基礎,比起IO,效能優越在哪裡,接著猜。。。

java nio有意模仿作業系統的通道控制方式,那他的底層是不是就是直接使用作業系統的通道?

通道中的資料是以塊為單位的,之前的流是以位元組為單位的,同樣的資料流操作外設的次數較多。程式碼中channel都是針對ByteBuffer物件進行read/write的,而ByteBuffer又是ByteBuffer.allocate(BLOCK);這樣建立的,是一個連續的塊空間。

那ByteBuffer是不是也是模擬作業系統的快取?

快取在io也有,如BufferedInputStream。CPU和外設的速度差很多,快取為了提高CPU使用率,等外設將資料讀入快取後,CPU再統一操作,不用外設讀一次,CPU操作一次,CPU的效率會被拉下來。。。