1. 程式人生 > >Java NIO-非阻塞通訊

Java NIO-非阻塞通訊

NIO(Non-block IO)指非阻塞通訊,相對於其程式設計的複雜性,通常客戶端並不需要使用非阻塞通訊以提高效能,故這裡只有服務端使用非阻塞通訊方式實現

客戶端:

package com.test.client;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;

import org.apache.log4j.Logger;

import com.test.util.SocketIO;

public class Client {
	static Logger logger = Logger.getLogger(Client.class);
	private int port = 10000;
	private SocketChannel socketChannel;
	
	public Client(){
		try {
			socketChannel = SocketChannel.open();
			InetAddress host = InetAddress.getLocalHost();
			InetSocketAddress addr = new InetSocketAddress(host, port);
			
			socketChannel.connect(addr);
			
			logger.debug("***");
			logger.debug("client ip:"+socketChannel.socket().getLocalAddress());
			logger.debug("client port:"+socketChannel.socket().getLocalPort());
			logger.debug("server ip:"+socketChannel.socket().getInetAddress());
			logger.debug("server port:"+socketChannel.socket().getPort());
			logger.debug("***");
		} catch (IOException e) {
			e.printStackTrace();
			logger.error("Cilent socket establish failed!");
		}
		logger.info("Client socket establish success!");
	}
	
	public void request(String request){
		try{
			DataInputStream input = SocketIO.getInput(socketChannel.socket());
			DataOutputStream output = SocketIO.getOutput(socketChannel.socket());
			
			if(null != request && !request.equals("")){
				byte[] bytes = request.getBytes("utf-8");
				output.write(bytes);
		
				bytes = new byte[64];
				int num = input.read(bytes);
				byte[] answer = new byte[num];
				System.arraycopy(bytes, 0, answer, 0, num);
				if(num > 0){
					logger.info("server answer:"+new String(answer,"utf-8"));
				}else{
					logger.info("No server answer.");
				}
			}
		}catch(Exception e){
			e.printStackTrace();
			logger.error("client request error");
		}finally{
			if(null != socketChannel){
				try{
					socketChannel.close();
				}catch(Exception e){
					e.printStackTrace();
					logger.error("socket close error");
				}
			}
		}
	}
	
	public static void main(String[] args){
		Client client1 = new Client();
		//Client client2 = new Client();
		client1.request("your name?");
		//client2.request("your name?");
	}
}

服務端:

package com.test.server;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

import org.apache.log4j.Logger;

public class Server {
	static Logger logger = Logger.getLogger(Server.class);
	private Selector selector;
	private ServerSocketChannel serverSocketChannel;
	private int queueNum = 10;
	private int bindPort = 10000;
	private int step = 0;
	private Charset charset = Charset.forName("utf-8");
	private ByteBuffer buffer = ByteBuffer.allocate(64);
	
	public Server(){
		try{
			//為ServerSocketChannel監控接收連線就緒事件
			//為SocketChannel監控連線就緒事件、讀就緒事件以及寫就緒事件
			selector = Selector.open();
			//作用相當於傳統通訊中的ServerSocket
			//支援阻塞模式和非阻塞模式
			serverSocketChannel = ServerSocketChannel.open();
			serverSocketChannel.socket().setReuseAddress(true);
			//非阻塞模式
			serverSocketChannel.configureBlocking(false);
			//serverSocketChannel.socket()會獲得一個和當前通道相關聯的socket
			serverSocketChannel.socket().bind(new InetSocketAddress(bindPort),queueNum);
			
			//註冊接收連線就緒事件
			//註冊事件後會返回一個SelectionKey物件用以跟蹤註冊事件控制代碼
			//該SelectionKey將會放入Selector的all-keys集合中,如果相應的事件觸發
			//該SelectionKey將會放入Selector的selected-keys集合中
			serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
		}catch(Exception e){
			e.printStackTrace();
			logger.error("Server establish error!");
		}
		logger.info("Server start up!");
	}

	public void service() throws Exception{
		//判斷是否有觸發事件
		while(selector.select() > 0){
			Set<SelectionKey> selectedKeys = selector.selectedKeys();
			Iterator<SelectionKey> iterator = selectedKeys.iterator();
			
			while(iterator.hasNext()){
				SelectionKey selectionKey = iterator.next();
				//處理事件後將事件從Selector的selected-keys集合中刪除
				iterator.remove();
				try{
					if(selectionKey.isAcceptable()){
						this.Acceptable(selectionKey);
					}else if(selectionKey.isReadable()){
						this.Readable(selectionKey);
					}else if(selectionKey.isWritable()){
						this.Writable(selectionKey);
					}
				}catch(Exception e){
					e.printStackTrace();
					logger.error("event deal exception!");
				}
			}
		}
	}
	
	private void Acceptable(SelectionKey selectionKey) throws Exception{
		logger.info("accept:"+(++step));
		
		ServerSocketChannel ssc = (ServerSocketChannel)selectionKey.channel();
		SocketChannel sc = (SocketChannel)ssc.accept();
		
		sc.configureBlocking(false);
		sc.register(selector, SelectionKey.OP_READ);
		
		logger.info(selectionKey.hashCode());
	}
	
	private void Readable(SelectionKey selectionKey) throws Exception{
		logger.info("read:"+(++step));
		
		SocketChannel sc = (SocketChannel)selectionKey.channel();
		
		buffer.clear();
		int num = sc.read(buffer);
		String request = "";
		if(num > 0){
			buffer.flip();
			
			request = charset.decode(buffer).toString();
			sc.register(selector, SelectionKey.OP_WRITE,request);
		}else{
			sc.close();
		}
		
		logger.info(selectionKey.hashCode()+":"+request);
	}
	
	private void Writable(SelectionKey selectionKey) throws Exception{
		logger.info("write:"+(++step));
		
		String request = (String)selectionKey.attachment();
		SocketChannel sc = (SocketChannel)selectionKey.channel();
		
		String answer = "not supported";
		if(request.equals("your name?")){
			answer = "server";
		}
		
		logger.info(selectionKey.hashCode()+":"+answer);
		
		buffer.clear();
		buffer.put(charset.encode(answer));
		buffer.flip();
		while(buffer.hasRemaining())
			sc.write(buffer);
		
		sc.close();
	}
	
	public static void main(String[] args) {
		Server server = new Server();
		try{
			server.service();
		}catch(Exception e){
			e.printStackTrace();
			logger.error("Server run exception!");
		}
	}
}

IO工具類:

package com.test.util;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;

public class SocketIO{
	public static DataInputStream getInput(Socket socket) throws IOException{
		//接收快取區大小,socket獲取輸入流之前設定
		socket.setReceiveBufferSize(10);
		InputStream input = socket.getInputStream();
		return new DataInputStream(input);
	}
	
	public static DataOutputStream getOutput(Socket socket) throws IOException{
		//傳送快取區大小,socket獲取輸出流之前設定
		socket.setSendBufferSize(10);
		OutputStream output = socket.getOutputStream();
		return new DataOutputStream(output);
	}
}

log4j日誌配置檔案:

log4j.rootLogger=debug,logOutput

log console out put 
log4j.appender.logOutput=org.apache.log4j.ConsoleAppender
log4j.appender.logOutput.layout=org.apache.log4j.PatternLayout
log4j.appender.logOutput.layout.ConversionPattern=%p%d{[yy-MM-dd HH:mm:ss]}[%c] -> %m%n

server端的執行結果:

INFO[13-10-16 11:40:41][com.test.server.Server] -> Server start up!
INFO[13-10-16 11:40:53][com.test.server.Server] -> accept:1
INFO[13-10-16 11:41:14][com.test.server.Server] -> 20469344
INFO[13-10-16 11:41:21][com.test.server.Server] -> read:2
INFO[13-10-16 11:41:37][com.test.server.Server] -> 11688861:your name?
INFO[13-10-16 11:43:00][com.test.server.Server] -> write:3
INFO[13-10-16 11:43:00][com.test.server.Server] -> 11688861:server

可以看到readable方法中的SelectionKey和writable方法中的SelectionKey的雜湊碼是完全相同的,是同一個SelectionKey

SelectionKey是在SocketChannel類或ServerSocketChannel類註冊要監控的事件時產生的,這兩個類本身並沒有register方法,需要檢視它們共同父類AbstractSelectableChannel(只有關鍵程式碼):

public abstract class AbstractSelectableChannel
		extends SelectableChannel{
	......
	// Keys that have been created by registering this channel with selectors.
    // They are saved because if this channel is closed the keys must be
    // deregistered.  Protected by keyLock.
    private SelectionKey[] keys = null;

	public final SelectionKey register(Selector sel, int ops, Object att)
			throws ClosedChannelException{
		if (!isOpen())
			throw new ClosedChannelException();
		if ((ops & ~validOps()) != 0)
			throw new IllegalArgumentException();
		synchronized (regLock) {
			if (blocking)
				throw new IllegalBlockingModeException();
			SelectionKey k = findKey(sel);
			if (k != null) {
				k.interestOps(ops);
				k.attach(att);
			}
			if (k == null) {
				// New registration
				k = ((AbstractSelector)sel).register(this, ops, att);
				addKey(k);
			}
			return k;
		}
	}

	private SelectionKey findKey(Selector sel) {
		synchronized (keyLock) {
			if (keys == null)
	        	return null;
		    for (int i = 0; i < keys.length; i++)
	        	if ((keys[i] != null) && (keys[i].selector() == sel))
	            	return keys[i];
		    return null;
		}
	}

	void removeKey(SelectionKey k) {			// package-private
		synchronized (keyLock) {
		    for (int i = 0; i < keys.length; i++)
			if (keys[i] == k) {
			    keys[i] = null;
			    keyCount--;
			}
		    ((AbstractSelectionKey)k).invalidate();
		}
	}
	......
}

ServerSocketChannel和Socketchannel向Selector中註冊了特定事件,Selector就會監控這些事件是否發生。ServerSocketChannel和Socketchannel都為AbstractSelectableChannel類的子類,AbstractSelectableChannel類的register方法負責註冊事件,該方法會返回一個SelectionKey物件,該物件用於跟蹤被註冊事件

public abstract class SelectionKey {
    protected SelectionKey() { }

    public abstract SelectableChannel channel();

    public abstract Selector selector();
    ......
}

一個Selector物件中包含了3種類型的鍵集(即SelectionKey集合,SelectionKey在以下部分被稱為“鍵”)

1,all-keys:所有註冊至該Selector的事件鍵集(selector.keys())

2,selected-keys:相關事件已經被Selector捕獲的鍵集(selector.selectedKeys())

3,cancel-keys:已被取消的鍵集(無法訪問該集合)

selected-keys和cancel-keys都為all-keys的子集,對於一個新建的Selector,這3個鍵集都為空

註冊事件時會將相應的SelectionKey加入Selector的all-keys鍵集中

取消SelectionKey或者關閉了SelectionKey相關聯的Channel,則會將相應的SelectionKey加入cancel-keys鍵集中

當執行選擇器的選擇操作時(selector.select(),對於選擇器來說,這個方法應該是相當重要的):

1,將cancel-keys中的每個SelectionKey從3個鍵集中移除(如果存在的話),並登出SelectionKey所關聯的Channel,cancel-keys鍵集變為空集。

2,如果監控的事件發生,Selector會將相關的SelectionKey加入selected-keys鍵集中

以下為對原始碼的分析、推測:

Selector作為選擇器,儲存了所有的Selectionkey(註冊的,取消的,觸發的),通過上面的AbstractSelectableChannel類的原始碼,發現Channel本身也儲存了一個自身關聯的SelectionKey陣列,這看起來是完全沒有必要的,但是仔細看一下register方法,能看出些許端倪:

Selector本身維護了3個集合,all-keys,selected-keys和cancel-keys,頻繁的註冊操作、取消註冊將會導致這3個集合頻繁的變化,伴隨頻繁變化的是頻繁的加鎖,這會嚴重的降低Selector的效能,畢竟一個Selector會被多個Channel作為選擇器使用,本身非阻塞的實現方式就是提高效能的一種解決方式

當註冊新的事件時,如果存在該通道相關的SelectionKey,則更新該SelectionKey所關注的事件以及其攜帶的附加資訊,如果不存在,則新增新的SelectionKey

這樣做的好處是,比起刪除以前的SelectionKey,新增新的SelectionKey,修改SelectionKey所關注的事件以及其攜帶的附加資訊顯然是更好的選擇,畢竟不需要修改Selector所維護的鍵集,當然也不需要頻繁加鎖(通過檢視Selector類的api,SelectionKey並不是thread-safe的,顯然並沒有加鎖,但是並沒有什麼問題),能夠提供更好的效能

總之,SelectionKey的雜湊碼會重複是很正常的,畢竟不是單純的註冊時新建、觸發後刪除方式,java實現時進行了優化