1. 程式人生 > >NIO非阻塞原理

NIO非阻塞原理

剛學了NIO,寫一下自己的理解 
網路通訊中,NIO提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道來實現,可以設定阻塞與非阻塞兩種模式,為了實現高負載高併發都採取非阻塞的模式。通道是雙向的,可以同時在通道上傳送和讀取資料。NIO採用可分配大小的緩衝區Buffer實現對資料的讀寫操作。 
伺服器僅採用一個執行緒去處理所有的客戶端執行緒,這就需要建立一個Selector,將ServerSocketChannel和想要監控的SocketChannel註冊到Selector中(用SelectableChannel的register方法,該方法返回一個這個channel向Selector註冊的鍵,是一個SelectionKey例項,它包裝了SelectableChannel和該通道感興趣的操作)。 
Selector就像一個觀察者,不斷地獲取Selector的select方法的返回值,返回值是準備就緒的SelectionKey的數目,然後就進行處理(通過Selector的selectedKeys方法返回被選擇的SelectionKey集合,然後處理連線請求和讀取資料)。 
每個客戶端只有一個SocketChannel,將該SocketChannel註冊到指定的Selector後,監聽該Selector即可。如果監聽到該Selector的select方法的返回值大於0,表明該Selector上有需要進行IO處理的SelectionKey,獲取到SocketChannel後即可處理請求和資料。 
貼一段練習的程式碼:

import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
class NServer 
{
    private Selector selector = null;
    static final int PORT = 30000;
    private Charset charset = Charset.forName("UTF-8");

    public void init() throws IOException
    {
        selector = Selector.open();
        ServerSocketChannel server = ServerSocketChannel.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT);
        server.bind(isa);
        //設定ServerSocket以非阻塞方式工作
        server.configureBlocking(false);
        //將server註冊到指定的Selector物件
        server.register(selector,SelectionKey.OP_ACCEPT);

        while(selector.select()>0)
        {
            //依次處理每個已選擇的SelectionKey
            for(SelectionKey sk : selector.selectedKeys())
            {
                selector.selectedKeys().remove(sk);
                //如果sk對應的Channel包含客戶端的連線請求
                if(sk.isAcceptable())
                {
                    SocketChannel sc = server.accept();
                    sc.configureBlocking(false);
                    sc.register(selector,SelectionKey.OP_READ);
                    //將sk對應的Channel設定成準備接受其他請求
                    sk.interestOps(SelectionKey.OP_ACCEPT);
                }
                //如果sk對應的Channel有資料需要讀取
                if(sk.isReadable())
                {
                    //獲取該SelectionKey對應的Channel
                    SocketChannel sc = (SocketChannel)sk.channel();
                    ByteBuffer buff = ByteBuffer.allocate(1024);
                    String content = "";
                    try
                    {
                        while(sc.read(buff)>0)
                        {
                            buff.flip();
                            content += charset.decode(buff);
                        }
                        System.out.println("讀取的資料:"+ content);
                        sk.interestOps(SelectionKey.OP_READ);
                    }
                    //如果捕獲到帶sk對應的Channel出現了異常,即表明該Channel對應的Client出現了問題,
                    //所以從Selector中取消sk的註冊
                    catch (IOException e)
                    {
                        sk.cancel();
                        if(sk.channel() != null)
                            sk.channel().close();
                    }
                    //將收到的資料發給所有註冊的SelectionKey
                    if(content.length()>0)
                    {
                        for(SelectionKey key : selector.keys())
                        {
                            Channel targetChannel = key.channel();
                            if(targetChannel instanceof SocketChannel)
                            {
                                SocketChannel dest = (SocketChannel)targetChannel;
                                dest.write(charset.encode(content));
                            }
                        }
                    }
                }
            }
        }
    }

    public static void main(String[] args) throws IOException
    {
        new NServer().init();
    }
}

class NClient
{
    private Selector selector = null;
    static final int PORT = 30000;
    private Charset charset = Charset.forName("UTF-8");
    private SocketChannel sc = null;

    public void init() throws IOException
    {
        selector = Selector.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT);
        sc = SocketChannel.open(isa);
        sc.configureBlocking(false);
        sc.register(selector,SelectionKey.OP_READ);
        //啟動讀取服務端資料的執行緒
        new ClientThread().start();
        //建立鍵盤輸入流
        Scanner scan = new Scanner(System.in);
        while (scan.hasNextLine())
        {
            String line = scan.nextLine();
            sc.write(charset.encode(line));
        }
    }

    //定義讀取服務端資料的執行緒
    private class ClientThread extends Thread
    {
        public void run()
        {
            try
            {
                while (selector.select()>0)
                {
                    //遍歷每個有可用IO操作的Channel對應的SelectionKey
                    for(SelectionKey sk : selector.selectedKeys())
                    {
                        selector.selectedKeys().remove(sk);
                        if(sk.isReadable())
                        {
                            //使用NIO讀取Channel中的資料
                            SocketChannel sc = (SocketChannel)sk.channel();
                            ByteBuffer buff = ByteBuffer.allocate(1024);
                            String content = "";
                            while(sc.read(buff)>0)
                            {
                                sc.read(buff);
                                buff.flip();
                                content += charset.decode(buff);
                            }
                            System.out.println("聊天資訊:"+ content);
                            sk.interestOps(SelectionKey.OP_READ);
                        }
                    }
                }
            }
            catch (IOException e)
            {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws IOException
    {
        new NClient().init();
    }
}