1. 程式人生 > >java Nio 使用 NioSocket 客戶端與服務端互動實現

java Nio 使用 NioSocket 客戶端與服務端互動實現

NioSocket 客戶端與服務端互動實現


       java Nio是jdk1.4新增的io方式—–nio(new IO),這種方式在目前來說算不算new,更合適的解釋應該是non-block IO。

       non-block是相對於傳統的io方式來講的。傳統的Io方式是阻塞的,我們拿網路io來舉例,傳統的io模型如下:
這裡寫圖片描述

       服務端主執行緒負責不斷地server.accept(),如果沒有客戶端請求主執行緒就會阻塞,當客戶端請求時,主執行緒會通過執行緒池建立一個新的執行緒執行。簡單解釋就是一個執行緒負責一個客戶端的socket,當客戶端因網路等原因傳遞速度慢的時候,服務端對應的客戶端的執行緒就會等待,很浪費資源。同時執行緒過少的話會影響服務的吞吐量,而執行緒過多的話由於上下文切換等原因會導致效率十分低下,傳統的io方式並不適合如今的網路流量。

       Nio的模型如下:
這裡寫圖片描述



       nio相比傳統的io模型,最大的特點是優化了執行緒的使用。nio通過selector可以使用一個執行緒去管理多個socket控制代碼,說是管理也不太合適,nio是採用的事件驅動模型,selector負責的是監控各個連線控制代碼的狀態,不是去輪詢每個控制代碼,而是在資料就緒後,將訊息通知給selector,而具體的socket控制代碼管理則是採用多路複用的模型,交由作業系統來完成。selector充當的是一個訊息的監聽者,負責監聽channel在其註冊的事件,這樣就可以通過一個執行緒完成了大量連線的管理,當註冊的事件發生後,再呼叫相應執行緒進行處理。這樣就不需要為每個連線都使用一個執行緒去維持長連線,減少了長連線的開銷,同時減少了上下文的切換提高了系統的吞吐量。

java Nio的組成

java Nio主要由三個核心部分組成:
- Buffer
- Channel
- Selector


       所有的io的Nio都是從一個channel開始的,Channel有點類似於流,但是和流不同的是,channel是可以雙向讀寫的。Channel有幾種型別,主要包含檔案io操作和網路io:
- FileChannel (檔案io)
- DatagramChannel (udp資料報)
- SocketChannel (tcp客戶端)
- ServerSocketChannel (tcp服務端)


       Buffer是一箇中間快取區,資料可以從channel讀取到buffer,也可以從buffer寫到channel中,在java中,傳統方式與io的互動,需要將資料從堆記憶體讀取到直接記憶體中,然後交由c語言來呼叫系統服務完成io的互動。而使用Buffer可以直接在直接記憶體中開闢記憶體區域,減少了io複製的操作,從而提高了io操作的效率。

#基本資料型別的buffer
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer


#檔案記憶體對映buffer
- MappedByteBuffer


#直接記憶體區buffer
- DirectBuffer


       Selector允許單個執行緒處理多個channel,可以將多個channel教給selector管理,並註冊相應的事件,而selector則採用事件驅動的方式,當註冊的事件就緒後,呼叫相應的相應的執行緒處理該時間,不用使用執行緒去維持長連線,減少了執行緒的開銷。Selector通過靜態工廠的open方法建立,然後通過channel的register註冊到Channel上。註冊後通過select方法等待請求,select請求有long型別引數,代表等待時間,如果等待時間內接受到操作請求,則返回可以操作請求的數量,否則超時往下走。傳入引數為零或者無參方法,則會採用阻塞模式知道有相應請求。收到請求後呼叫selectedKeys返回SelectionKey的集合。SelectionKey儲存了處理當前請求的Channel和Selector,並且提供了不同的操作型別。SelectionKey的操作有四種:
- SelectionKey.OP_CONNECT
- SelectionKey.OP_ACCEPT
- SelectionKey.OP_READ
- SelectionKey.OP_WRITE

下面為一個客戶端與服務端實用NioSocket互動的簡單例子:

//對selectionKey事件的處理
/**
 * description:
 *
 * @author wkGui
 */
interface ServerHandlerBs {
    void handleAccept(SelectionKey selectionKey) throws IOException;

    String handleRead(SelectionKey selectionKey) throws IOException;
}


/**
 * description:
 *
 * @author wkGui
 */
public class ServerHandlerImpl implements ServerHandlerBs {
    private int bufferSize = 1024;
    private String localCharset = "UTF-8";

    public ServerHandlerImpl() {
    }

    public ServerHandlerImpl(int bufferSize) {
        this(bufferSize, null);
    }

    public ServerHandlerImpl(String localCharset) {
        this(-1, localCharset);
    }

    public ServerHandlerImpl(int bufferSize, String localCharset) {
        this.bufferSize = bufferSize > 0 ? bufferSize : this.bufferSize;
        this.localCharset = localCharset == null ? this.localCharset : localCharset;
    }

    @Override
    public void handleAccept(SelectionKey selectionKey) throws IOException {
        //獲取channel
        SocketChannel socketChannel = ((ServerSocketChannel) selectionKey.channel()).accept();
        //非阻塞
        socketChannel.configureBlocking(false);
        //註冊selector
        socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));

        System.out.println("建立請求......");
    }

    @Override
    public String handleRead(SelectionKey selectionKey) throws IOException {
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();

        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();

        String receivedStr = "";

        if (socketChannel.read(buffer) == -1) {
            //沒讀到內容關閉
            socketChannel.shutdownOutput();
            socketChannel.shutdownInput();
            socketChannel.close();
            System.out.println("連線斷開......");
        } else {
            //將channel改為讀取狀態
            buffer.flip();
            //按照編碼讀取資料
            receivedStr = Charset.forName(localCharset).newDecoder().decode(buffer).toString();
            buffer.clear();

            //返回資料給客戶端
            buffer = buffer.put(("received string : " + receivedStr).getBytes(localCharset));
            //讀取模式
            buffer.flip();
            socketChannel.write(buffer);
            //註冊selector 繼續讀取資料
            socketChannel.register(selectionKey.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
        }
        return receivedStr;
    }

}
//服務端server類
/**
 * description:
 *
 * @author wkGui
 */
public class NioSocketServer {

    private volatile byte flag = 1;

    public void setFlag(byte flag) {
        this.flag = flag;
    }

    public void start() {
        //建立serverSocketChannel,監聽8888埠
        try (ServerSocketChannel serverSocketChannel = ServerSocketChannel.open()) {
            serverSocketChannel.socket().bind(new InetSocketAddress(8888));
            //設定為非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //為serverChannel註冊selector
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            System.out.println("服務端開始工作:");

            //建立訊息處理器
            ServerHandlerBs handler = new ServerHandlerImpl(1024);

            while (flag == 1) {
                selector.select();
                System.out.println("開始處理請求 : ");
                //獲取selectionKeys並處理
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while (keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    try {
                        //連線請求
                        if (key.isAcceptable()) {
                            handler.handleAccept(key);
                        }
                        //讀請求
                        if (key.isReadable()) {
                            System.out.println(handler.handleRead(key));
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    //處理完後移除當前使用的key
                    keyIterator.remove();
                }
                System.out.println("完成請求處理。");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

//server端啟動類
/**
 * description:
 *
 * @author wkGui
 */
public class ServerMain {
    public static void main(String[] args) {
        NioSocketServer server = new NioSocketServer();
        new Thread(() -> {
            try {
                Thread.sleep(10*60*1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                server.setFlag((byte) 0);
            }
        }).start();
        server.start();
    }
}
//客戶端client類
/**
 * description:
 *
 * @author wkGui
 */
public class NioSocketClient {
    public void start() {
        try (SocketChannel socketChannel = SocketChannel.open()) {
            //連線服務端socket
            SocketAddress socketAddress = new InetSocketAddress("localhost", 8888);
            socketChannel.connect(socketAddress);

            int sendCount = 0;

            ByteBuffer buffer = ByteBuffer.allocate(1024);

            //這裡最好使用selector處理   這裡只是為了寫的簡單
            while (sendCount < 10) {
                buffer.clear();
                //向服務端傳送訊息
                buffer.put(("current time : " + System.currentTimeMillis()).getBytes());
                //讀取模式
                buffer.flip();
                socketChannel.write(buffer);
                buffer.clear();

                //從服務端讀取訊息
                int readLenth = socketChannel.read(buffer);
                //讀取模式
                buffer.flip();
                byte[] bytes = new byte[readLenth];
                buffer.get(bytes);
                System.out.println(new String(bytes, "UTF-8"));
                buffer.clear();


                sendCount++;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

//client啟動類
/**
 * description:
 *
 * @author wkGui
 */
public class ClientMain {
    public static void main(String[] args) {
        new NioSocketClient().start();
    }
}