1. 程式人生 > >Nio使用Selector客戶端與服務器的通信

Nio使用Selector客戶端與服務器的通信

高性能 buffer 模型 tar 簡單 next() 是個 mes 實例

使用NIO的一個最大優勢就是客戶端於服務器自己的不再是阻塞式的,也就意味著服務器無需通過為每個客戶端的鏈接而開啟一個線程。而是通過一個叫Selector的輪循器來不斷的檢測那個Channel有消息處理。
簡單來講,Selector會不斷地輪詢註冊在其上的Channel,如果某個Channel上面有新的TCP連接接入、讀和寫事件,這個Channel就處於就緒狀態,會被Selector輪詢出來,然後通過SelectionKey可以獲取就緒Channel的Set集合,進行後續的I/O操作。
由於select操作只管對selectedKeys的集合進行添加而不負責移除,所以當某個消息被處理後我們需要從該集合裏去掉。

一個多路復用器Selector可以同時輪詢多個Channel,由於JDK使用了epoll()代替傳統的select實現,所以它並沒有最大連接句柄1024/2048的限制。這也就意味著只需要一個線程負責Selector的輪詢,就可以接入成千上萬的客戶端,這確實是個非常巨大的進步。

下面,我們通過NIO編程的序列圖和源碼分析來熟悉相關的概念,以便鞏固我們前面所學的NIO基礎知識。

è¿éåå¾çæè¿°

下面,我們對NIO服務端的主要創建過程進行講解和說明,作為NIO的基礎入門,我們將忽略掉一些在生產環境中部署所需要的一些特性和功能(比如TCP半包等問題)。

步驟一:打開ServerSocketChannel,用於監聽客戶端的連接,它是所有客戶端連接的父管道,代碼示例如下。

ServerSocketChannel server = ServerSocketChannel.open();
步驟二:綁定監聽端口,設置連接為非阻塞模式,示例代碼如下。

server.socket().bind(new InetSocketAddress(7777),1024);
// 設置為非阻塞模式, 這個非常重要
server.configureBlocking(false);
步驟三:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。(即 selector)

Selector selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟四:將ServerSocketChannel註冊到Reactor線程的多路復用器Selector上,監聽ACCEPT事件,代碼如下。

server.register(selector, SelectionKey.OP_ACCEPT);
步驟五:多路復用器在線程run方法的無限循環體內輪詢準備就緒的Key,代碼如下。

while(true){
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()) {
key = (SelectKey)it.next();
//處理io
}
}
步驟六:多路復用器監聽到有新的客戶端接入,處理新的接入請求,完成TCP三次握手,建立物理鏈路,代碼示例如下。

// 得到與客戶端的套接字通道
SocketChannel channel = ssc.accept();
步驟七:設置客戶端鏈路為非阻塞模式,示例代碼如下。

channel.configureBlocking(false);
步驟八:將新接入的客戶端連接註冊到Reactor線程的多路復用器上,監聽讀操作,用來讀取客戶端發送的網絡消息,代碼如下。

channel.register(selector, SelectionKey.OP_READ);
步驟九:異步讀取客戶端請求消息到緩沖區,示例代碼如下。

int readBytes = channel.read(byteBuffer);
步驟十:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼如下。

Object message = null;
while (buffer.hasRemain()) {
buffer.mark();
message = decode(buffer);
if(message == null){
buffer.reset();
break;
}
}
if(!buffer.hasRemain()){
buffer.clear();
}else {
buffer.compact();
}

//業務線程處理message
步驟十一:將POJO對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。

socketChannel.write(byteBuffer);
註意:如果發送區TCP緩沖區滿,會導致寫半包,此時,需要註冊監聽寫操作位,循環寫,直到整包消息寫入TCP緩沖區,此處不贅述,後續會詳細分析Netty的處理策略。

package chanel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NioService {
    public static void main(String[] args) {
        try {
            NioService server = new NioService();
            server.init();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public  void init() throws IOException {
        Charset charset = Charset.forName("UTF-8");
        // 創建一個選擇器,可用close()關閉,isOpen()表示是否處於打開狀態,他不隸屬於當前線程
        Selector selector = Selector.open();
        // 創建ServerSocketChannel,並把它綁定到指定端口上
        ServerSocketChannel server = ServerSocketChannel.open();
        server.socket().bind(new InetSocketAddress(7777),1024);
        // 設置為非阻塞模式, 這個非常重要
        server.configureBlocking(false);
        // 在選擇器裏面註冊關註這個服務器套接字通道的accept事件
        // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用於SocketChannel
        server.register(selector, SelectionKey.OP_ACCEPT);


        while (true) {
            //休眠時間為1s,無論是否有讀寫等事件發生,selector每隔1s都被喚醒一次
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) {
                //如果key對應的Channel包含客戶端的鏈接請求
                // OP_ACCEPT 這個只有ServerSocketChannel才有可能觸發
                key=it.next();
                // 由於select操作只管對selectedKeys進行添加,所以key處理後我們需要從裏面把key去掉
                it.remove();
                if (key.isAcceptable()) {
                    ServerSocketChannel ssc  = (ServerSocketChannel) key.channel();
                    // 得到與客戶端的套接字通道
                    //ServerSocketChannel的accept接收客戶端的連接請求並創建SocketChannel實例,完成上述操作後,相當於完成了TCP的三次握手,TCP物理鏈路正式建立。
                    //我們需要將新創建的SocketChannel設置為異步非阻塞,同時也可以對其TCP參數進行設置,例如TCP接收和發送緩沖區的大小等。此處省掉
                    SocketChannel channel = ssc.accept();
                    channel.configureBlocking(false);
                    channel.register(selector, SelectionKey.OP_READ);
                    //將key對應Channel設置為準備接受其他請求
                    key.interestOps(SelectionKey.OP_ACCEPT);
                }
                if (key.isReadable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                    String content = "";
                    try {
                        int readBytes = channel.read(byteBuffer);
                        if (readBytes > 0) {
                            byteBuffer.flip(); //為write()準備
                            byte[] bytes = new byte[byteBuffer.remaining()];
                            byteBuffer.get(bytes);
                            content+=new String(bytes);
                            System.out.println(content);
                            //回應客戶端
                            doWrite(channel);
                        }
                        // 寫完就把狀態關註去掉,否則會一直觸發寫事件(改變自身關註事件)
                        key.interestOps(SelectionKey.OP_READ);
                    } catch (IOException i) {
                        //如果捕獲到該SelectionKey對應的Channel時出現了異常,即表明該Channel對於的Client出現了問題
                        //所以從Selector中取消該SelectionKey的註冊
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
            }
        }
    }
    private  void doWrite(SocketChannel sc) throws IOException{
        byte[] req ="服務器已接受aaa".getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        if(!byteBuffer.hasRemaining()){
            System.out.println("Send 2 Service successed");
        }
    }
}


現在我們來看編寫客戶端的流程:

è¿éåå¾çæè¿°

步驟一:打開SocketChannel,綁定客戶端本地地址(可選,默認系統會隨機分配一個可用的本地地址),示例代碼如下。

SocketChannel channel = SocketChannel.open();
步驟二:設置SocketChannel為非阻塞模式,同時設置客戶端連接的TCP參數,示例代碼如下。

channel.configureBlocking(false);
步驟三:異步連接服務端,示例代碼如下。

步驟四:判斷是否連接成功,如果連接成功,則直接註冊讀狀態位到多路復用器中,如果當前沒有連接成功(異步連接,返回false,說明客戶端已經發送sync包,服務端沒有返回ack包,物理鏈路還沒有建立),示例代碼如下。

if(channel.connect(new InetSocketAddress("127.0.0.1",7777))){
channel.register(selector, SelectionKey.OP_READ);
//發送消息
doWrite(channel, "66666666");
}else {
channel.register(selector, SelectionKey.OP_CONNECT);
}
步驟五:向Reactor線程的多路復用器註冊OP_CONNECT狀態位,監聽服務端的TCP ACK應答,示例代碼如下。

channel.register(selector, SelectionKey.OP_CONNECT);
步驟六:創建Reactor線程,創建多路復用器並啟動線程,代碼如下。

selector = Selector.open();
new Thread(new ReactorTask()).start();
步驟七:多路復用器在線程run方法的無限循環體內輪詢準備就緒的Key,代碼如下。

while (!stop){
selector.select(1000);
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while (it.hasNext()){
}
}
步驟八:接收connect事件進行處理並判斷是否鏈接成功,示例代碼如下。

if (key.isConnectable()){
if (channel.finishConnect()) {
}
}
步驟九:註冊讀事件到多路復用器,示例代碼如下。

channel.register(selector, SelectionKey.OP_READ);
步驟十:異步讀客戶端請求消息到緩沖區,示例代碼如下。

int readBytes = channel.read(byteBuffer);
步驟十一:對ByteBuffer進行編解碼,如果有半包消息指針reset,繼續讀取後續的報文,將解碼成功的消息封裝成Task,投遞到業務線程池中,進行業務邏輯編排,示例代碼如下。

Object message = null;
while (buffer.hasRemain()) {
buffer.mark();
message = decode(buffer);
if(message == null){
buffer.reset();
break;
}
}
if(!buffer.hasRemain()){
buffer.clear();
}else {
buffer.compact();
}

//業務線程處理message
步驟十二:將發生對象encode成ByteBuffer,調用SocketChannel的異步write接口,將消息異步發送給客戶端,示例代碼如下。

socketChannel.write(byteBuffer);

package chanel;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;

public class NioClient {
    public static void main(String[] args) {
        try {
            NioClient client = new NioClient();
            client.init();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    // 創建一個套接字通道,註意這裏必須使用無參形式
    private Selector selector = null;
    static Charset charset = Charset.forName("UTF-8");
    private volatile boolean stop = false;
    public ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);

    public void init() throws IOException {
        selector = Selector.open();
        SocketChannel channel = SocketChannel.open();
        // 設置為非阻塞模式,這個方法必須在實際連接之前調用(所以open的時候不能提供服務器地址,否則會自動連接)
        channel.configureBlocking(false);
        if (channel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
            channel.register(selector, SelectionKey.OP_READ);
            //發送消息
            doWrite(channel, "66666666");
        } else {
            channel.register(selector, SelectionKey.OP_CONNECT);
        }


        //啟動一個接受服務器反饋的線程
        //  new Thread(new ReceiverInfo()).start();

        while (!stop) {
            selector.select(1000);
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            SelectionKey key = null;
            while (it.hasNext()) {
                key = it.next();
                it.remove();
                SocketChannel sc = (SocketChannel) key.channel();
                // OP_CONNECT 兩種情況,鏈接成功或失敗這個方法都會返回true
                if (key.isConnectable()) {
                    // 由於非阻塞模式,connect只管發起連接請求,finishConnect()方法會阻塞到鏈接結束並返回是否成功
                    // 另外還有一個isConnectionPending()返回的是是否處於正在連接狀態(還在三次握手中)
                    if (channel.finishConnect()) {
                       /* System.out.println("準備發送數據");
                        // 鏈接成功了可以做一些自己的處理
                        channel.write(charset.encode("I am Coming"));
                        // 處理完後必須吧OP_CONNECT關註去掉,改為關註OP_READ
                        key.interestOps(SelectionKey.OP_READ);*/
                        sc.register(selector, SelectionKey.OP_READ);
                        //    new Thread(new DoWrite(channel)).start();
                        doWrite(channel, "66666666");
                    } else {
                        //鏈接失敗,進程推出或直接拋出IOException
                        System.exit(1);
                    }
                }
                if (key.isReadable()) {
                    //讀取服務端的響應
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(buffer);
                    String content = "";
                    if (readBytes > 0) {
                        buffer.flip();
                        byte[] bytes = new byte[buffer.remaining()];
                        buffer.get(bytes);
                        content += new String(bytes);
                        stop = true;
                    } else if (readBytes < 0) {
                        //對端鏈路關閉
                        key.channel();
                        sc.close();
                    }
                    System.out.println(content);
                    key.interestOps(SelectionKey.OP_READ);
                }
            }
        }
    }

    private void doWrite(SocketChannel sc, String data) throws IOException {
        byte[] req = data.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
        byteBuffer.put(req);
        byteBuffer.flip();
        sc.write(byteBuffer);
        if (!byteBuffer.hasRemaining()) {
            System.out.println("Send 2 client successed");
        }
    }
}

啟動客戶端類的init()方法之後就會向服務器發送一個字符串,服務器接受到之後會向客戶端回應一個。運行如上代碼,結果正確。

通過源碼對比分析,我們發現NIO編程難度確實比同步阻塞BIO大很多,我們的NIO例程並沒有考慮“半包讀”和“半包寫”,如果加上這些,代碼將會更加復雜。NIO代碼既然這麽復雜,為什麽它的應用卻越來越廣泛呢,使用NIO編程的優點總結如下。

(1)客戶端發起的連接操作是異步的,可以通過在多路復用器註冊OP_CONNECT等待後續結果,不需要像之前的客戶端那樣被同步阻塞。

(2)SocketChannel的讀寫操作都是異步的,如果沒有可讀寫的數據它不會同步等待,直接返回,這樣I/O通信線程就可以處理其他的鏈路,不需要同步等待這個鏈路可用。

3)線程模型的優化:由於JDK的Selector在Linux等主流操作系統上通過epoll實現,它沒有連接句柄數的限制(只受限於操作系統的最大句柄數或者對單個進程的句柄限制),這意味著一個Selector線程可以同時處理成千上萬個客戶端連接,而且性能不會隨著客戶端的增加而線性下降,因此,它非常適合做高性能、高負載的網絡服務器。

JDK1.7升級了NIO類庫,升級後的NIO類庫被稱為NIO2.0,引人註目的是,Java正式提供了異步文件I/O操作,同時提供了與UNIX網絡編程事件驅動I/O對應的AIO,後續我們學習下如何利用NIO2.0編寫AIO程序,還是以客戶端服務器通信為例進行講解。

Nio使用Selector客戶端與服務器的通信