1. 程式人生 > >nio 客戶端與服務端通訊Demo

nio 客戶端與服務端通訊Demo

本篇博文主要是從網上收集和整理眾多網友關於NIO的理解所寫的博文,非作者原創(除最後的服務端與客戶端通訊的Demo),在此宣告。

1. NIO入門概念:

主要參考文獻:Java nio 使用及原理分析

Java NIO 使用及原理分析(一):

      主要對緩衝區Buffer的概念和通道Channel的概念進行了簡單的介紹;

    緩衝區 實際上是一個數組,在NIO庫中,所有資料都是用緩衝區處理的。在讀取資料時,它是直接讀到緩衝區中的; 在寫入資料時,它也是寫入到緩衝區中的;任何時候訪問 NIO 中的資料,都是將它放到緩衝區中。而在面向流I/O系統中,所有資料都是直接寫入或者直接將資料讀取到Stream物件中。

    通道是一個物件,通過它可以讀取和寫入資料,當然了所有資料都通過Buffer物件來處理。我們永遠不會將位元組直接寫入通道中,相反是將資料寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將資料從通道讀入緩衝區,再從緩衝區獲取這個位元組。

使用NIO讀取資料 (將chanel對應的終端資料讀入到buffer中, 核心方法inChannel.read(buffer))

1. 從FileInputStream獲取Channel
2. 建立Buffer
3. 將資料從Channel讀取(read)到Buffer中

使用NIO寫入資料 (將buffer中的資料寫入到channel對應的終端,核心方法,channel.write(buffer))


1. 從FileInputStream獲取Channel

2. 建立Buffer

3. 將資料從Channel寫入(write)到Buffer中

import java.io.*;  
import java.nio.*;  
import java.nio.channels.*;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;  
  
public class ChannelReadToBuffer {  
    static public void main( String args[] ) throws Exception { 
    	Charset charset = Charset.forName("GBK");
    	CharsetDecoder decoder = charset.newDecoder();
        FileInputStream fin = new FileInputStream("e:\\nioTest\\src.txt");  
        // 獲取通道  
        FileChannel fc = fin.getChannel();  
        // 建立緩衝區  
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);  
        CharBuffer charBuffer = CharBuffer.allocate(512);
        // 讀取資料到緩衝區  
        fc.read(byteBuffer);  
        byteBuffer.flip();
        
        decoder.decode(byteBuffer, charBuffer, false);  
        charBuffer.flip();
        
        System.out.println(charBuffer);            
        fc.close();  
        fin.close();
    }  
}  

    import java.io.*;  
    import java.nio.*;  
    import java.nio.channels.*;  
      
    public class Program {  
        static private final byte message[] = { 83, 111, 109, 101, 32,  
            98, 121, 116, 101, 115, 46 };  
      
        static public void main( String args[] ) throws Exception {  
            FileOutputStream fout = new FileOutputStream( "c:\\test.txt" );  
              
            FileChannel fc = fout.getChannel();  
              
            ByteBuffer buffer = ByteBuffer.allocate( 1024 );  
              
            for (int i=0; i<message.length; ++i) {  
                buffer.put( message[i] );  
            }  
              
            buffer.flip();  
              
            fc.write( buffer );  
              
            fout.close();  
        }  
    }  

Java NIO 使用及原理分析(二)和 Java NIO 使用及原理分析(三):

主要介紹了buffer的 position limit capacity flip slice等概念;

緩衝區的分配allocate()

緩衝區分片slice()

只讀緩衝區asReadOnlyBuffer()

記憶體對映檔案I/OMappedByteBuffer

Java NIO 使用及原理分析(四):

NIO中非阻塞I/O採用了基於Reactor模式的工作方式,I/O呼叫不會被阻塞,相反是註冊感興趣的特定I/O事件,如可讀資料到達,新的套接字連線等等,在發生特定事件時,系統再通知我們。NIO中實現非阻塞I/O的核心物件就是Selector,Selector就是註冊各種I/O事件地 方,而且當那些事件發生時,就是這個物件告訴我們所發生的事件,如下圖所示:


當有讀或寫等任何註冊的事件發生時,可以從Selector中獲得相應的SelectionKey,同時從 SelectionKey中可以找到發生的事件和該事件所發生的具體的SelectableChannel,以獲得客戶端傳送過來的資料

使用NIO中非阻塞I/O編寫伺服器處理程式,大體上可以分為下面三個步驟:

1. 向Selector物件註冊感興趣的事件
2. 從Selector中獲取感興趣的事件
3. 根據不同的事件進行相應的處理

具體博主縮寫的示例也很清楚,只是寫的有點簡單,並且沒有說清客服端如何處理詳細的請求;後面我們給出一個詳細的示例進行展示;

2. 使用selector進行客戶端與服務端的通訊

參考文獻:

(1) 白話NIO之Selector

(2) Java NIO學習筆記(三) 使用Selector客戶端與伺服器的通訊

3. 客戶端與服務端selector通訊demo

3.1 服務端程式碼:

package com.qian.nio.scoket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class ServerSocketDemo {
	
	private static final String IP = "10.86.38.57";
	private static final int PORT = 8001;
	private static final int BUFFER_SIZE = 128;
	//統計客戶端的個數
	private static int clientCount = 0;
	
	private static ServerSocketChannel serverChannel = null;
	
	public static void server() throws IOException{
		//1. 獲取服務端通道並繫結IP和埠號
		serverChannel = ServerSocketChannel.open();
		serverChannel.socket().bind(new InetSocketAddress(IP, PORT));
		//2. 將服務端通道設定成非阻塞模式
		serverChannel.configureBlocking(false);
		//3. 開啟一個選擇器
		Selector selector = Selector.open();
		//4. 向選擇器上註冊監聽事件(接收事件)// 註冊該事件後,當事件到達的時候,selector.select()會返回, 否則會一直阻塞  
		serverChannel.register(selector, SelectionKey.OP_ACCEPT);
		// 採用輪訓的方式監聽selector上是否有需要處理的事件,如果有,進行處理  
		while(true){
			// 輪訓selector
			selector.select();
			//5. 獲取選擇器上所有監聽事件值
			Set<SelectionKey> selectionKeySet = selector.selectedKeys();
			Iterator<SelectionKey> it = selectionKeySet.iterator();
			while(it.hasNext()){
				//6. 獲取selectionKey值
				SelectionKey selectionKey = it.next();
				try{//解決客戶端關閉或 服務端讀取不到獲取無法寫入客戶端報IO異常;
					//7. 根據key值判斷事件
					if(selectionKey.isValid() && selectionKey.isAcceptable()){//測試此鍵的通道是否已準備好接受新的套接字連線。
						//8. 接入事件處理
						SocketChannel socketChannel = serverChannel.accept();
						socketChannel.configureBlocking(false);
						socketChannel.register(selector, SelectionKey.OP_READ);
						//響應客戶端;(沒有必要的)導致客戶端
						clientCount++;
						replyClientForAcceptable(socketChannel);
					} else if(selectionKey.isValid() && selectionKey.isReadable()){
						// 處理客戶端傳送來的訊息
						dealClientMsg(selectionKey);						
						// 響應客戶端
						replyClientMsg(selectionKey);
					} else if(selectionKey.isValid() && selectionKey.isWritable()){
						
					}
					//10. 手動刪除selectionKey 
					it.remove();
				}catch(IOException e){
					if(selectionKey!=null){
						selectionKey.cancel();						
					}
					SocketChannel sc = (SocketChannel) selectionKey.channel();
					if(sc!=null){
						sc.socket().close();
						sc.close();
					}
					continue;//繼續監聽其他客戶端發來的訊息;
				}
			}
		}
		
	}
	
	/**
	 * @Description 響應客戶端的接入事件;
	 * @param socketChannel
	 * @throws IOException
	 */
	private static void replyClientForAcceptable(SocketChannel socketChannel) throws IOException {
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		buffer.put(("hello client "+clientCount+"!\r\n").getBytes());
		buffer.flip();
		socketChannel.write(buffer);
		buffer.clear();
	}
	
	/**
	 * @Description 處理客戶端訊息;
	 * @param selectionKey
	 * @throws IOException
	 */
	private static void dealClientMsg(SelectionKey selectionKey) throws IOException {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		int len = 0;
		//將客戶端socket傳送來的資料讀入到buffer中;
		while ((len = socketChannel.read(buffer)) > 0) {
			buffer.flip();
			byte[] bytes = new byte[BUFFER_SIZE];
			buffer.get(bytes, 0, len);
			String msg = new String(bytes,0,len);
			//將客戶端發來的訊息列印到控制檯
			System.out.println(msg);
		}
	}
	
	/**
	 * @Description 回覆客戶端訊息;
	 * @param selectionKey
	 * @throws IOException
	 */
	private static void replyClientMsg(SelectionKey selectionKey) throws IOException {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		buffer.put(("get your message of client !\r\n").getBytes());
		buffer.flip();
		socketChannel.write(buffer);
		buffer.clear();
	}


	public static void main(String[] args) {
		try {
			server();
		} catch (IOException e) {
			if(serverChannel != null){
				try {
					serverChannel.close();
				} catch (IOException e1) {
					e1.printStackTrace();
				}
			}
		}
	}

}

客戶端:

package com.qian.nio.scoket;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Scanner;

public class ClientSocketDemo {
	
	private static final String IP = "10.86.38.57";
	private static final int PORT = 8001;
	private static final int BUFFER_SIZE = 128;
		
	public static void send() throws InterruptedException  {
		SocketChannel socketChannel = null;
		try{
			//1. 獲取socketChannel
			socketChannel = SocketChannel.open();
			//2. 建立連線
			socketChannel.connect(new InetSocketAddress(IP, PORT));
			//3. 設定通道為非阻塞
			socketChannel.configureBlocking(false);    
			// 傳送握手訊息
			sendHandMsg(socketChannel);
			// 接收握手訊息,實際上是接收的服務端Acceptable中的響應
			Thread.sleep(10);
			reciveServerMsg(socketChannel);
			ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
			@SuppressWarnings("resource")
			Scanner scanner = new Scanner(System.in);//鍵盤輸入
			String msg;
			System.out.println("Please input your message to server : ");
			while (scanner.hasNext()) {
				msg = scanner.nextLine();
				buffer.put((new Date() + ": " + msg).getBytes());
				buffer.flip();
				//4. 向通道寫資料(向服務端傳送資料)
				socketChannel.write(buffer);   
				buffer.clear();
				// 接收服務端發來的資料
				Thread.sleep(10);
				reciveServerMsg(socketChannel);
			}
		}catch(IOException e){			
			System.out.println("=======服務端已關閉連線 訊息傳送失敗=====");
		}finally{
			if(socketChannel != null){
				try {
					socketChannel.close();
				} catch (IOException e) {
					e.printStackTrace();
				}				
			}
		}
	}

	
	private static void sendHandMsg(SocketChannel socketChannel) throws IOException {
		//響應服務端的握手在channel.write(buffer)以後服務端才會呼叫Acceptable中響應訊息函式
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
        buffer.put((new Date() + ": hello server!\r\n").getBytes());
        buffer.flip();
        //4. 向通道寫資料
        socketChannel.write(buffer);
        buffer.clear();
	}
	
	/**
	 * @ Description接收服務端傳送回來的訊息;
	 * @param socketChannel
	 * @throws IOException
	 */
	private static void reciveServerMsg(SocketChannel socketChannel) throws IOException {
		ByteBuffer buffer = ByteBuffer.allocate(BUFFER_SIZE);
		while ((socketChannel.read(buffer)) > 0) {
            buffer.flip();
            while (buffer.hasRemaining()) {
                System.out.print((char) buffer.get());
            }
            buffer.clear();
        }
	}
	
	public static void main(String[] args) throws IOException, InterruptedException {
		send();
	}

}

演示結果:

客戶端1:

hello client 1!
get your message of client !
Please input your message to server : 
client1 a
get your message of client !
clent1 b
get your message of client !
client1 stop
get your message of client !
client1 end
get your message of client !

客戶端2:


服務端:



幾點說明和註解:

第一:

一個Channel僅僅可以註冊到一個Selector一次,如果將Channel註冊到Selector多次,那麼其實相當於在更新SelectionKey 的 insterest set。

channel.register(selector, SelectionKey.OP_READ);
channel.register(selector, SelectionKey.OP_READ | SelectionKey.OP_WRITE);

上面的 channel 註冊到同一個 Selector 兩次了, 那麼第二次的註冊其實就是相當於更新這個 Channel 的 interest set 為 SelectionKey.OP_READ | SelectionKey.OP_WRITE.

第二:

Set<SelectionKey> selectedKeys = selector.selectedKeys();

Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

while(keyIterator.hasNext()) {
    
    SelectionKey key = keyIterator.next();

    if(key.isAcceptable()) {
        // a connection was accepted by a ServerSocketChannel.

    } else if (key.isConnectable()) {
        // a connection was established with a remote server.

    } else if (key.isReadable()) {
        // a channel is ready for reading

    } else if (key.isWritable()) {
        // a channel is ready for writing
    }

    keyIterator.remove();
}

需要說明的是,select() 方法僅僅是簡單地將就緒的IO操作放到了set<selectionKey>集合中,並且用完後自己不會刪除,因此我們每次在迭代中都要手動的呼叫 keyIterator.remove() 方法將這個key刪除。例如:我們在收到OP_ACCEPT 通知, 然後我們進行相關處理, 但是並沒有將這個 Key 從 SelectedKeys 中刪除, 那麼下一次 select() 返回時 我們還可以在 SelectedKeys 中獲取到 OP_ACCEPT 的 key.

第三:

關於在不同監聽事件中,key.channel()返回物件的區分;

if (key.isAcceptable()) {
      // 當 OP_ACCEPT 事件到來時, 我們就有從 ServerSocketChannel 中獲取一個 SocketChannel,
      //注意, 在 OP_ACCEPT 事件中, 從 key.channel() 返回的 Channel 是 ServerSocketChannel.
      // 而在 OP_WRITE 和 OP_READ 中, 從 key.channel() 返回的是 SocketChannel.
      SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
      clientChannel.configureBlocking(false);
      // 在 OP_ACCEPT 到來時, 再將這個 Channel 的 OP_READ 註冊到 Selector 中.
      // 注意, 這裡我們如果沒有設定 OP_READ 的話, 即 interest set 仍然是 OP_CONNECT 的話, 
      // 那麼 select 方法會一直直接返回.
      clientChannel.register(key.selector(), OP_READ, ByteBuffer.allocate(BUF_SIZE));
 }