1. 程式人生 > >Java NIO(三)阻塞與非阻塞

Java NIO(三)阻塞與非阻塞

   阻塞與非阻塞

阻塞   傳統的 IO 流都是阻塞式的。也就是說,當一個執行緒呼叫 read() 或 write()時,該執行緒被阻塞,直到有一些資料被讀取或寫入,該執行緒在此期間不能執行其他任務。因此,在完成網路通訊進行 IO 操作時,由於執行緒會阻塞,所以伺服器端必須為每個客戶端都提供一個獨立的執行緒進行處理,當伺服器端需要處理大量客戶端時,效能急劇下降。

非阻塞  Java NIO 是非阻塞模式的。當執行緒從某通道進行讀寫資料時,若沒有資料可用時,該執行緒可以進行其他任務。執行緒通常將非阻塞 IO 的空閒時間用於在其他通道上執行 IO 操作,所以單獨的執行緒可以管理多個輸入和輸出通道。因此,NIO 可以讓伺服器端使用一個或有限幾個執行緒來同時處理連線到伺服器端的所有客戶端。

1. 通道(Channel):負責連線
        
       java.nio.channels.Channel 介面:
            |--SelectableChannel
                |--SocketChannel
                |--ServerSocketChannel
                |--DatagramChannel


                |--Pipe.SinkChannel
                |--Pipe.SourceChannel


2. 緩衝區(Buffer):負責資料的存取

3. 選擇器(Selector):是 SelectableChannel 的多路複用器。用於監控 SelectableChannel 的 IO 狀況

阻塞式

    @Test
	public void client() throws Exception{
		//建立通道
		FileChannel open = FileChannel.open(Paths.get("2.jpg"), StandardOpenOption.READ);
		//分配緩衝區
		ByteBuffer buffer = ByteBuffer.allocate(1024);
		//建立socket通道
		SocketChannel open2 = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9988));
		//讀取本地檔案,併發送到服務端
		while(open.read(buffer)!=-1){
			buffer.flip();
		open2.write(buffer);
		buffer.clear();
		}
		open2.shutdownOutput();
		//獲取服務端反饋
		int len=0;
		while((len=open2.read(buffer))!=-1){
			buffer.flip();
			System.out.println(new String (buffer.array(),0,len));
			buffer.clear();
		}
		open.close();
		open2.close();
	};
	
	@Test
	public void server() throws IOException{
		// 獲取通道
		FileChannel open = FileChannel.open(Paths.get("11.jpg"), StandardOpenOption.WRITE,
				StandardOpenOption.CREATE);
		ServerSocketChannel open2 = ServerSocketChannel.open();
		//分配指定大小的緩衝區
		ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
		//繫結連線
		open2.bind(new InetSocketAddress(9988));
		//獲取客戶端連線的通道
		SocketChannel accept = open2.accept();
		//接收客戶端的資料,並儲存到本地
		while(accept.read(byteBuffer)!=-1){
			byteBuffer.flip();
			open.write(byteBuffer);
			byteBuffer.clear();
		}
		
 
		//傳送反饋給客戶端
		byteBuffer.put("圖片收到了".getBytes());
		byteBuffer.flip();
		accept.write(byteBuffer);
		accept.close();
		open.close();
		open2.close();
	}

非阻塞式

@Test
	public void client() throws Exception{
		//1. 獲取通道
		SocketChannel clientSoc=SocketChannel.open(new InetSocketAddress("127.0.0.1", 9988));
		//2. 切換非阻塞模式
		clientSoc.configureBlocking(false);
		//3. 分配指定大小的緩衝區
		ByteBuffer allocate = ByteBuffer.allocate(1024);
		//4. 傳送資料給服務端
		Scanner sc=new Scanner(System.in);
		while(sc.hasNext()){
			String next = sc.next();
			allocate.put((LocalDate.now()+"--客戶端說"+"\n"+next).getBytes());
			allocate.flip();
			clientSoc.write(allocate);
			allocate.clear();
		}
		sc.close();
		//5. 關閉通道
		clientSoc.close();
	}
	
	
	@Test
	public void Server() throws Exception{
		//1. 獲取通道
		ServerSocketChannel serSoc=ServerSocketChannel.open();
		//2. 切換非阻塞模式
		serSoc.configureBlocking(false);
		//3. 繫結連線
		serSoc.bind(new InetSocketAddress(9988));
		//4. 獲取選擇器
		Selector selector = Selector.open();
		//5. 將通道註冊到選擇器上, 並且指定“監聽接收事件”
		serSoc.register(selector,  SelectionKey.OP_ACCEPT);
		//6. 輪詢式的獲取選擇器上已經“準備就緒”的事件
		while(selector.select()>0){
			//7. 獲取當前選擇器中所有註冊的“選擇鍵(已就緒的監聽事件)”
			Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
			while(iterator.hasNext()){
				//8. 獲取準備“就緒”的是事件
				SelectionKey sk = iterator.next();
				//9. 判斷具體是什麼事件準備就緒
				if(sk.isAcceptable()){
					//10. 若“接收就緒”,獲取客戶端連線
					SocketChannel sChannel = serSoc.accept();
					//11. 切換非阻塞模式
					sChannel.configureBlocking(false);
					//12. 將該通道註冊到選擇器上
					sChannel.register(selector, SelectionKey.OP_READ);
				}else if(sk.isReadable()){
					//13. 獲取當前選擇器上“讀就緒”狀態的通道
					SocketChannel sChannel = (SocketChannel) sk.channel();
					//14. 讀取資料
					ByteBuffer buf = ByteBuffer.allocate(1024);
					int len = 0;
					while((len = sChannel.read(buf)) > 0 ){
						buf.flip();
						System.out.println(new String(buf.array(), 0, len));
						buf.clear();
					}
				}
			}
			//15. 取消選擇鍵 SelectionKey
			iterator.remove();
		}
		
	}

註冊Selector

serSoc.register(selector,  SelectionKey.OP_ACCEPT);

當呼叫 register(Selector sel, int ops) 將通道註冊選擇器時,選擇器

對通道的監聽事件,需要通過第二個引數 ops 指定。
可以監聽的事件型別(用 可使用 SelectionKey  的四個常量 表示):
    讀 : SelectionKey.OP_READ (1)
    寫 : SelectionKey.OP_WRITE (4)
    連線 : SelectionKey.OP_CONNECT (8)
    接收 : SelectionKey.OP_ACCEPT (16)
  若註冊時不止監聽一個事件,則可以使用“位或”操作符連線。

int interestKey=SelectionKey.OP_ACCEPT|SelectionKey.OP_WRITE;

SelectionKey:表示 SelectableChannel 和 Selector 之間的註冊關係。每次向
選擇器註冊通道時就會選擇一個事件(選擇鍵)。選擇鍵包含兩個表示為整
數值的操作集。操作集的每一位都表示該鍵的通道所支援的一類可選擇操作。

方 法 描 述

int interestOps()                                 獲取感興趣事件集合
int readyOps()                                    獲取通道已經準備就緒的操作的集合
SelectableChannel channel()             獲取註冊通道
Selector selector()                              返回選擇器
boolean isReadable()                         檢測 Channal 中讀事件是否就緒
boolean isWritable()                           檢測 Channal 中寫事件是否就緒
boolean isConnectable()                    檢測 Channel 中連線是否就緒
boolean isAcceptable()                       檢測 Channel 中接收是否就緒
選擇 器(Selector )的應用

Selector  的常用方法描述
Set<SelectionKey> keys()     所有的 SelectionKey 集合。代表註冊在該Selector上的Channel
selectedKeys()                        被選擇的 SelectionKey 集合。返回此Selector的已選擇鍵集
int select()                           監控所有註冊的Channel,當它們中間有需要處理的 IO 操作時,該方法返回,並將對應得的 SelectionKey 加入被選擇的SelectionKey 集合中,該方法返回這些 Channel 的數量。
int select(long timeout)         可以設定超時時長的 select() 操作
int selectNow()                      執行一個立即返回的 select() 操作,該方法不會阻塞執行緒
Selector wakeup()                  使一個還未返回的 select() 方法立即返回
void close()                            關閉該選擇器

UDP的傳送

    @Test
	public void send() throws IOException{
		DatagramChannel dc = DatagramChannel.open();
		dc.configureBlocking(false);
		ByteBuffer buf = ByteBuffer.allocate(1024);
		Scanner scan = new Scanner(System.in);
		while(scan.hasNext()){
			String str = scan.next();
			buf.put((new Date().toString() + ":\n" + str).getBytes());
			buf.flip();
			dc.send(buf, new InetSocketAddress("127.0.0.1", 9898));
			buf.clear();
		}
		
		dc.close();
	}
	
	@Test
	public void receive() throws IOException{
		DatagramChannel dc = DatagramChannel.open();
		dc.configureBlocking(false);
		dc.bind(new InetSocketAddress(9898));
		Selector selector = Selector.open();
		dc.register(selector, SelectionKey.OP_READ);
		while(selector.select() > 0){
			Iterator<SelectionKey> it = selector.selectedKeys().iterator();
			while(it.hasNext()){
				SelectionKey sk = it.next();
				if(sk.isReadable()){
					ByteBuffer buf = ByteBuffer.allocate(1024);
					dc.receive(buf);
					buf.flip();
					System.out.println(new String(buf.array(), 0, buf.limit()));
					buf.clear();
				}
			}
			it.remove();
		}
	}

單向管道

    @Test
	public void test01() throws IOException{
		//1. 獲取管道
		Pipe pipe = Pipe.open();
		
		//2. 將緩衝區中的資料寫入管道
		ByteBuffer buf = ByteBuffer.allocate(1024);
		
		Pipe.SinkChannel sinkChannel = pipe.sink();
		buf.put("通過單向管道傳送資料".getBytes());
		buf.flip();
		sinkChannel.write(buf);
		
		//3. 讀取緩衝區中的資料
		Pipe.SourceChannel sourceChannel = pipe.source();
		buf.flip();
		int len = sourceChannel.read(buf);
		System.out.println(new String(buf.array(), 0, len));
		
		sourceChannel.close();
		sinkChannel.close();
	}

原文地址: https://blog.csdn.net/yjaspire/article/details/80518517