1. 程式人生 > >使用NIO實現非阻塞Socket通訊原理

使用NIO實現非阻塞Socket通訊原理

剛學了NIO,寫一下自己的理解
網路通訊中,NIO提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道來實現,可以設定阻塞與非阻塞兩種模式,為了實現高負載高併發都採取非阻塞的模式。通道是雙向的,可以同時在通道上傳送和讀取資料。NIO採用可分配大小的緩衝區Buffer實現對資料的讀寫操作。
伺服器僅採用一個執行緒去處理所有的客戶端執行緒,這就需要建立一個Selector,將ServerSocketChannel和想要監控的SocketChannel註冊到Selector中(用SelectableChannel的register方法,該方法返回一個這個channel向Selector註冊的鍵,是一個SelectionKey例項,它包裝了SelectableChannel和該通道感興趣的操作)。
Selector就像一個觀察者,不斷地獲取Selector的select方法的返回值,返回值是準備就緒的SelectionKey的數目,然後就進行處理(通過Selector的selectedKeys方法返回被選擇的SelectionKey集合,然後處理連線請求和讀取資料)。
每個客戶端只有一個SocketChannel,將該SocketChannel註冊到指定的Selector後,監聽該Selector即可。如果監聽到該Selector的select方法的返回值大於0,表明該Selector上有需要進行IO處理的SelectionKey,獲取到SocketChannel後即可處理請求和資料。
貼一段練習的程式碼:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
class NServer 
{
    private Selector selector = null;
    static final int PORT = 30000;
    private Charset charset = Charset.forName("UTF-8");

    public void init() throws
IOException { selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT); server.bind(isa); //設定ServerSocket以非阻塞方式工作 server.configureBlocking(false); //將server註冊到指定的Selector物件
server.register(selector,SelectionKey.OP_ACCEPT); while(selector.select()>0) { //依次處理每個已選擇的SelectionKey for(SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); //如果sk對應的Channel包含客戶端的連線請求 if(sk.isAcceptable()) { SocketChannel sc = server.accept(); sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); //將sk對應的Channel設定成準備接受其他請求 sk.interestOps(SelectionKey.OP_ACCEPT); } //如果sk對應的Channel有資料需要讀取 if(sk.isReadable()) { //獲取該SelectionKey對應的Channel SocketChannel sc = (SocketChannel)sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; try { while(sc.read(buff)>0) { buff.flip(); content += charset.decode(buff); } System.out.println("讀取的資料:"+ content); sk.interestOps(SelectionKey.OP_READ); } //如果捕獲到帶sk對應的Channel出現了異常,即表明該Channel對應的Client出現了問題, //所以從Selector中取消sk的註冊 catch (IOException e) { sk.cancel(); if(sk.channel() != null) sk.channel().close(); } //將收到的資料發給所有註冊的SelectionKey if(content.length()>0) { for(SelectionKey key : selector.keys()) { Channel targetChannel = key.channel(); if(targetChannel instanceof SocketChannel) { SocketChannel dest = (SocketChannel)targetChannel; dest.write(charset.encode(content)); } } } } } } } public static void main(String[] args) throws IOException { new NServer().init(); } } class NClient { private Selector selector = null; static final int PORT = 30000; private Charset charset = Charset.forName("UTF-8"); private SocketChannel sc = null; public void init() throws IOException { selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT); sc = SocketChannel.open(isa); sc.configureBlocking(false); sc.register(selector,SelectionKey.OP_READ); //啟動讀取服務端資料的執行緒 new ClientThread().start(); //建立鍵盤輸入流 Scanner scan = new Scanner(System.in); while (scan.hasNextLine()) { String line = scan.nextLine(); sc.write(charset.encode(line)); } } //定義讀取服務端資料的執行緒 private class ClientThread extends Thread { public void run() { try { while (selector.select()>0) { //遍歷每個有可用IO操作的Channel對應的SelectionKey for(SelectionKey sk : selector.selectedKeys()) { selector.selectedKeys().remove(sk); if(sk.isReadable()) { //使用NIO讀取Channel中的資料 SocketChannel sc = (SocketChannel)sk.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; while(sc.read(buff)>0) { sc.read(buff); buff.flip(); content += charset.decode(buff); } System.out.println("聊天資訊:"+ content); sk.interestOps(SelectionKey.OP_READ); } } } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) throws IOException { new NClient().init(); } }