1. 程式人生 > >JAVA NIO 之 Selector 組件

JAVA NIO 之 Selector 組件

重要功能 ret clas 技術 allocated 單線程 byte exce 重要

NIO 重要功能就是實現多路復用。Selector是SelectableChannel對象的多路復用器。一些基礎知識:

選擇器(Selector):選擇器類管理著一個被註冊的通道集合的信息和它們的就緒狀態。

可選擇通道(SelectableChannel):這個抽象類提供了實現通道的可選擇性所需要的公共方法。它是所有支持就緒檢查的通道類的

父類。例如:ServerSocketChannel、SocketChannel。可選擇通道可以被註冊到選擇器上。

選擇鍵(SelectionKey):選擇鍵封裝了特定的通道與特定的選擇器的註冊關系。

前面的一篇文章NIO簡介中介紹了傳統io的同步阻塞服務器實現,現在來看看NIO多路復用服務器的實現。NIO 利用單線程輪詢事件機制,定位就緒的Channel,決定執行什麽,

僅僅 select()方法階段是阻塞的。這樣一個選擇器避免了之前的多個客服端時切換線程的問題。下面的一張圖能描述這種場景:

技術分享圖片

代碼實現:

服務器server:

public class SelectSockets {

    private static int PORT_NUMBER = 9011;

    /**
     * allocateDirect(1024) 此方法創建的buffer無法調用array();直接內存
     */
    private ByteBuffer buffer = ByteBuffer.allocate(1024);

    public static void main(String[] argv) throws Exception {
        new SelectSockets().go(argv);
    }

    public void go(String[] argv) throws Exception {
        System.out.println("Listening on port " + PORT_NUMBER);
        // 創建ServerSocketChannel
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        // 獲得ServerSocket
        ServerSocket serverSocket = serverChannel.socket();
        // 創建Selector
        Selector selector = Selector.open();
        // 綁定
        serverSocket.bind(new InetSocketAddress(PORT_NUMBER));
        // false設置為非阻塞模式
        serverChannel.configureBlocking(false);
        // 註冊通道
        ////ServerSocketChannel只能註冊SelectionKey.OP_ACCEPT;register(Selector sel, int ops)的ops參數可以通過serverSocketChannel.validOps()獲取。
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        while (true) {
            //選擇器select有三種方式,這種帶時間的表示,沒有連接阻塞10秒後繼續或者有連接進來時繼續
            int n = selector.select(10000);
            if (n == 0) {
                continue;
            }
            //selectedKeys()已選擇的鍵
            Iterator it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = (SelectionKey) it.next();
                //檢查是否有效
                if (!key.isValid()) {
                    continue;
                }
                //accept
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
                    SocketChannel channel = server.accept();
                    System.out.println ("Incoming connection from: "+ channel.socket().getRemoteSocketAddress( ));
                    registerChannel(selector, channel, SelectionKey.OP_READ);
                    buffer.clear();
                    buffer.put("你好,我是服務器!\r\n".getBytes());
                    buffer.flip();
                    channel.write(buffer);
                }
                //if(key.isReadable())等價於if((key.readyOps( ) & SelectionKey.OP_READ) != 0)
                if (key.isReadable()) {
                    readHandler(key);
                }
                it.remove();
            }
        }
    }

    /**
     * 設置感興趣的通道屬性
     * @param selector
     * @param channel
     * @param ops
     * @throws Exception
     */
    protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception {
        if (channel == null) {
            return;
        }
        channel.configureBlocking(false);
        channel.register(selector, ops);
    }

    /**
     * 處理讀取數據
     * @param key
     * @throws Exception
     */
    protected void readHandler(SelectionKey key) throws Exception {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int count;
        StringBuilder sb = new StringBuilder();
        ByteBuffer tmpByteBuffer = ByteBuffer.allocate(1024);
        //讀取客服端消息
        while ((count = socketChannel.read(tmpByteBuffer)) > 0) {
            tmpByteBuffer.flip();
            sb.append(new String(tmpByteBuffer.array()));
            // 這裏可以回寫給客服端
            while (tmpByteBuffer.hasRemaining()) {
                socketChannel.write(tmpByteBuffer);
            }
            tmpByteBuffer.clear();
        }
        System.out.println("客服端"+socketChannel.socket().getRemoteSocketAddress()+"說:"+sb.toString());

        if (count < 0) {
            // Close channel on EOF, invalidates the key
            socketChannel.close();
        }
    }

}

  客服端:

/**
 * @author tangquanbin
 * @date 2018/10/23 22:23
 */
public class Client {

    private static final int BUFFER_SIZE = 1024;
    private static  int PORT = 9011;
    private static String[] messages =
            {"今天讀到一句話,覺得很好:但行好事,莫問前程。"};

    public static void main(String[] args) {
        try {
            InetAddress inetAddress = InetAddress.getLocalHost();
            InetSocketAddress address =new InetSocketAddress(inetAddress, PORT);
            SocketChannel socketChannel = SocketChannel.open(address);

            for (String msg: messages) {
                ByteBuffer myBuffer=ByteBuffer.allocate(BUFFER_SIZE);
                myBuffer.put(msg.getBytes());
                myBuffer.flip();
                int bytesWritten = socketChannel.write(myBuffer);
                logger(String.format("Sending Message...: %s\nbytesWritten...: %d",msg, bytesWritten));
            }
            logger("Closing Client connection...");
            socketChannel.close();
        } catch (IOException e) {
            logger(e.getMessage());
            e.printStackTrace();
        }
    }

    public static void logger(String msg) {
        System.out.println(msg);
    }

}

  也可以用telnet命令測試:

telnet 127.0.0.1 9011

JAVA NIO 之 Selector 組件