1. 程式人生 > >JAVA網路通訊之NIO

JAVA網路通訊之NIO

本篇將展現JAVA網路通訊中NIO的部分,和上一篇不同,本篇所用的Socket是通過SocketChannel的方式,這是NIO與傳統IO最主要的區別。

一、基於緩衝的SocketChannel

和傳統的IO基於流的方式不同,NIO採用基於緩衝的方式。二者的最重要的區別就是基於流的方式是阻塞的,而基於緩衝的方式卻不需要阻塞。關於阻塞與非阻塞,最直觀的的便是當我們通過IO的方式進行通訊的時候,當前執行緒必須等待返回請求響應並處理完該響應之後才能進行傳送下一次請求。但NIO卻並非如此,當通過SocketChannel傳送請求之後,當前執行緒無需等待便可傳送下一次請求。

二、基於事件的Selector

Selector是JAVA NIO中的新概念,它通常需要跟Channel配合使用,用於註冊Channel並監聽Channel的事件。在JAVA的網路通訊中,Channel的事件有下列四種:

  • Connect, 連線就緒事件,當SocketChannel成功連線到伺服器時,該事件觸發。

  • Accept,接收就緒事件,當ServerSocketChannel成功接收到客戶端的請求,並表示可以接收資料時,該事件觸發。

  • Read,讀就緒事件,當SocketChannel或ServerSocketChannel表示可以讀取通道中資料時,該事件觸發。

  • Write,寫就緒事件,當SocketChannel或ServerSocketChannel表示可以往通道中寫資料時,該事件觸發。

當Channel在Selector註冊之後,會生成一個對應的SelectionKey,該SelectionKey會立即返回,也可以通過Selector的selectedKeys()方法獲取一個就緒狀態的SelectionKey集。之後便可通過SelectionKey來控制請求會話。具體程式碼如下:

客戶端:

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class SocketClient {

	private Selector selector;

	private SocketChannel socketCli;

	private String host;

	private int port;

	public SocketClient(String host, int port) throws Exception {
		socketCli = SocketChannel.open();
		socketCli.configureBlocking(false); // 非阻塞模式
		selector = Selector.open();
		this.host = host;
		this.port = port;
	}

	public void send(String msg) throws IOException {
                /** 
                 * 在請求連線之前註冊監聽Connect事件,否則將無法在Selector中獲取到SelectorKey,
                 * 更無法獲取SocketChannel的Connect事件
                 */
		socketCli.register(selector, SelectionKey.OP_CONNECT);
		socketCli.connect(new InetSocketAddress(InetAddress.getByName(host), port)); // 請求連線
		while (selector.select(100) > 0) { // 輪詢校驗通道狀態是否就緒
			Set<SelectionKey> selectionKeys = selector.selectedKeys(); // 獲取就緒狀態的SelectorKey集合
			Iterator<SelectionKey> iterator = selectionKeys.iterator();
			while (iterator.hasNext()) {
				SelectionKey selectionKey = iterator.next();
				iterator.remove(); 
				if (selectionKey.isValid()) { // 是否有效
					if (selectionKey.isConnectable()) { // 是否連線就緒
						SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
						if (socketChannel.finishConnect()) {
							socketChannel.register(selector, SelectionKey.OP_WRITE); // 註冊寫就緒事件
						}
					}
					if (selectionKey.isWritable()) { // 是否寫就緒
						SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
						byte[] bytes = msg.getBytes();
						ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
						byteBuffer.put(bytes); // 將位元組寫入緩衝
						byteBuffer.flip(); // 切換到讀模式
						while (byteBuffer.hasRemaining()) {
							socketChannel.write(byteBuffer); // 將緩衝寫入通道
						}
						socketChannel.register(selector, SelectionKey.OP_READ); // 註冊讀就緒事件
					}
					if (selectionKey.isReadable()) { // 是否讀就緒
						SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
						ByteBuffer byteBufferRead = ByteBuffer.allocate(1024);
						int readBytes = socketChannel.read(byteBufferRead); // 將通道中的資料讀入緩衝
						if (readBytes > 0) {
							byteBufferRead.flip(); // 切換到讀模式
							byte[] bytesRead = new byte[byteBufferRead.remaining()];
							byteBufferRead.get(bytesRead); // 將緩衝資料讀入到位元組陣列中
							System.out.println("from SERVER : " + new String(bytesRead, "UTF-8"));
						}
					}
				}
			}
		}
	}

	public void close() {
		if (socketCli != null && socketCli.isOpen()) {
			try {
				socketCli.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {

		ExecutorService executor = Executors.newFixedThreadPool(5);

		for (int i = 1; i <= 1000; i  ) {
			final int index = i;
			executor.submit(new Runnable() {

				@Override
				public void run() {
					try {
						SocketClient cli = new SocketClient("127.0.0.1", 8088);
						cli.send("Hello world, No." + String.format("%04d", index));
						cli.close();
					} catch (IOException e) {
						e.printStackTrace();
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			});

		}
	}
}

伺服器端:

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class SocketServer {

	private Selector selector;

	private ServerSocketChannel socketSvr;

	private boolean flag = true;

	public SocketServer(int port) {
		try {
			System.out.println("Server start...");
			socketSvr = ServerSocketChannel.open();
			socketSvr.socket().setReuseAddress(true);
			socketSvr.socket().bind(new InetSocketAddress(port), 1024); // 繫結埠
			socketSvr.configureBlocking(false); // 非阻塞模式
			selector = Selector.open();
			socketSvr.register(selector, SelectionKey.OP_ACCEPT); // 註冊接收就緒
		}  catch (IOException e) {
			e.printStackTrace();
			try {
				if (socketSvr != null && socketSvr.isOpen()) {
					socketSvr.close();
				}
			} catch (IOException e1) {
				e1.printStackTrace();
			}
			try {
				if (selector != null && selector.isOpen()) {
					selector.close();
				}
			} catch (IOException e1) {
				e1.printStackTrace();
			}
		}
	}

	public void accept() {
		try {
			while (flag) {
				selector.select(1000);
				Set<SelectionKey> selectionKeys = selector.selectedKeys();
				Iterator<SelectionKey> iterator = selectionKeys.iterator();
				while (iterator.hasNext()) {
					SelectionKey selectionKey = iterator.next();
					iterator.remove();
					try {
						if (selectionKey.isValid()) {
							if (selectionKey.isAcceptable()) {
								ServerSocketChannel socketSvrChannel = (ServerSocketChannel) selectionKey.channel();
								SocketChannel socketChannel = socketSvrChannel.accept();
								socketChannel.configureBlocking(false);
								socketChannel.register(selector, SelectionKey.OP_READ);
							}
							if (selectionKey.isReadable()) {
								SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
								ByteBuffer byteBufferRead = ByteBuffer.allocate(1024);
								int readBytes = 0;
								readBytes = socketChannel.read(byteBufferRead);
								if (readBytes > 0) {
									byteBufferRead.flip();
									byte[] bytesRead = new byte[byteBufferRead.remaining()];
									byteBufferRead.get(bytesRead);
									System.out.println("from CLI : " + new String(bytesRead, "UTF-8"));
									socketChannel.register(selector, SelectionKey.OP_WRITE);
								}
							}
							if (selectionKey.isWritable()) {
								SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
								byte[] bytesWrite = "SESSION ENDING...".getBytes();
								ByteBuffer byteBufferWrite = ByteBuffer.allocate(bytesWrite.length);
								byteBufferWrite.put(bytesWrite);
								byteBufferWrite.flip();
								socketChannel.write(byteBufferWrite);
                                                                // 取消註冊,防止停留在Writable狀態導致重複寫入訊息
    								selectionKey.cancel();
							}
						}
					} catch (IOException e) {
						e.printStackTrace();
						selectionKey.cancel();
					}
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}
	public static void main(String[] args) {
		new SocketServer(8088).accept();
	}
}