1. 程式人生 > >NIO實現非阻塞式Socket通訊

NIO實現非阻塞式Socket通訊

Selector 非阻塞通訊核心

  • Selector負責監控所有已經註冊的Channel
  • Selector通過SelectionKey來關聯Channel
  • 群聊伺服器nio非阻塞式
public class NServer
{
	// 用於檢測所有Channel狀態的Selector
	private Selector selector = null;
	static final int PORT = 30000;
	// 定義實現編碼、解碼的字符集物件
	private Charset charset = Charset.forName("UTF-8");
	public void init()throws IOException
	{
		selector = Selector.open();
		// 通過open方法來開啟一個未繫結的ServerSocketChannel例項
		ServerSocketChannel server = ServerSocketChannel.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
		// 將該ServerSocketChannel繫結到指定IP地址
		server.bind(isa);
		// 設定ServerSocket以非阻塞方式工作
		server.configureBlocking(false);
		// 將server註冊到指定Selector物件
		server.register(selector, SelectionKey.OP_ACCEPT);
		while (selector.select() > 0)
		{
			// 依次處理selector上的每個已選擇的SelectionKey
			for (SelectionKey sk : selector.selectedKeys())
			{
				// 從selector上的已選擇Key集中刪除正在處理的SelectionKey
				selector.selectedKeys().remove(sk);      // ①
				// 如果sk對應的Channel包含客戶端的連線請求
				if (sk.isAcceptable())        // ②
				{
					// 呼叫accept方法接受連線,產生伺服器端的SocketChannel
                    //此處Server就一個可以直接上邊的獲取到,可以不通過SelectionKey 獲取
					SocketChannel sc = server.accept();
					// 設定採用非阻塞模式
					sc.configureBlocking(false);
					// 將該SocketChannel也註冊到selector
					sc.register(selector, SelectionKey.OP_READ);
					// 將sk對應的Channel設定成準備接受其他請求
					sk.interestOps(SelectionKey.OP_ACCEPT);
				}
				// 如果sk對應的Channel有資料需要讀取
				if (sk.isReadable())     // ③
				{
					// 獲取該SelectionKey對應的Channel,該Channel中有可讀的資料
					SocketChannel sc = (SocketChannel)sk.channel();
					// 定義準備執行讀取資料的ByteBuffer
					ByteBuffer buff = ByteBuffer.allocate(1024);
					String content = "";
					// 開始讀取資料
					try
					{
						while(sc.read(buff) > 0)
						{
							buff.flip();
							content += charset.decode(buff);
						}
						// 列印從該sk對應的Channel裡讀取到的資料
						System.out.println("讀取的資料:" + content);
						// 將sk對應的Channel設定成準備下一次讀取
						sk.interestOps(SelectionKey.OP_READ);
					}
					// 如果捕捉到該sk對應的Channel出現了異常,即表明該Channel
					// 對應的Client出現了問題,所以從Selector中取消sk的註冊
					catch (IOException ex)
					{
						// 從Selector中刪除指定的SelectionKey
						sk.cancel();
						if (sk.channel() != null)
						{
							sk.channel().close();
						}
					}
					// 如果content的長度大於0,即聊天資訊不為空
					if (content.length() > 0)
					{
						// 遍歷該selector裡註冊的所有SelectionKey
						for (SelectionKey key : selector.keys())
						{
							// 獲取該key對應的Channel
							Channel targetChannel = key.channel();
							// 如果該channel是SocketChannel物件
							if (targetChannel instanceof SocketChannel)
							{
								// 將讀到的內容寫入該Channel中
								SocketChannel dest = (SocketChannel)targetChannel;
								dest.write(charset.encode(content));
							}
						}
					}
				}
			}
		}
	}
	public static void main(String[] args)
		throws IOException
	{
		new NServer().init();
	}
}
  • 群聊客戶端
public class NClient
{
	// 定義檢測SocketChannel的Selector物件
	private Selector selector = null;
	static final int PORT = 30000;
	// 定義處理編碼和解碼的字符集
	private Charset charset = Charset.forName("UTF-8");
	// 客戶端SocketChannel
	private SocketChannel sc = null;
	public void init()throws IOException
	{
		selector = Selector.open();
		InetSocketAddress isa = new InetSocketAddress("127.0.0.1", PORT);
		// 呼叫open靜態方法建立連線到指定主機的SocketChannel
		sc = SocketChannel.open(isa);
		// 設定該sc以非阻塞方式工作
		sc.configureBlocking(false);
		// 將SocketChannel物件註冊到指定Selector
		sc.register(selector, SelectionKey.OP_READ);
		// 啟動讀取伺服器端資料的執行緒
		new ClientThread().start();
		// 建立鍵盤輸入流
		Scanner scan = new Scanner(System.in);
		while (scan.hasNextLine())
		{
			// 讀取鍵盤輸入
			String line = scan.nextLine();
			// 將鍵盤輸入的內容輸出到SocketChannel中
			sc.write(charset.encode(line));
		}
	}
	// 定義讀取伺服器資料的執行緒
	private class ClientThread extends Thread
	{
		public void run()
		{
			try
			{
				while (selector.select() > 0)    // ①
				{
					// 遍歷每個有可用IO操作Channel對應的SelectionKey
					for (SelectionKey sk : selector.selectedKeys())
					{
						// 刪除正在處理的SelectionKey
						selector.selectedKeys().remove(sk);
						// 如果該SelectionKey對應的Channel中有可讀的資料
						if (sk.isReadable())
						{
							// 使用NIO讀取Channel中的資料
							SocketChannel sc = (SocketChannel)sk.channel();
							ByteBuffer buff = ByteBuffer.allocate(1024);
							String content = "";
							while(sc.read(buff) > 0)
							{
								buff.flip();
								content += charset.decode(buff);
							}
							// 列印輸出讀取的內容
							System.out.println("聊天資訊:" + content);
							// 為下一次讀取作準備
							sk.interestOps(SelectionKey.OP_READ);
						}
					}
				}
			}
			catch (IOException ex)
			{
				ex.printStackTrace();
			}
		}
	}
	public static void main(String[] args)
		throws IOException
	{
		new NClient().init();
	}
}
  • Selector API
  • SelectionKey  API