1. 程式人生 > >走進分散式Java開發(一)—Java NIO

走進分散式Java開發(一)—Java NIO

概述

何為NIO,百度百科上的解釋為:

java.nio全稱java non-blocking IO(實際上是 new io),是指jdk1.4 及以上版本里提供的新api(New IO) ,為所有的原始型別(boolean型別除外)提供快取支援的資料容器,使用它可以提供非阻塞式的高伸縮性網路。

為所有的原始型別提供(Buffer)快取支援。字符集編碼解碼解決方案。 Channel :一個新的原始I/O 抽象。 支援鎖和記憶體對映檔案的檔案訪問介面。 提供多路(non-blocking) 非阻塞式的高伸縮性網路I/O 。

簡單來說,傳統Java是面向物件的,而NIO是面向緩衝區的,傳統的IO是阻塞的,NIO是非阻塞的。

核心部分

通道

通道(channel)和流(stream)最大的不同,就是流是單向的,通道是雙向的。

  • FileChannel(IO)
  • DatagramChannel(UDP)
  • SocketChannel(TCP)
  • ServerSocketChannel(TCP)

緩衝區

NIO中的關鍵Buffer實現有:ByteBuffer, CharBuffer, DoubleBuffer, FloatBuffer, IntBuffer, LongBuffer, ShortBuffer,分別對應基本資料型別: byte, char, double, float, int, long, short。當然NIO中還有MappedByteBuffer, HeapByteBuffer, DirectByteBuffer等

Selector

Selector執行單執行緒處理多個Channel,如果你的應用打開了多個通道,但每個連線的流量都很低,使用Selector()方法就會很方便,例如在一個聊天伺服器中。要使用Selector, 得向Selector註冊Channel,然後呼叫它的select()方法。這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,執行緒就可以處理這些事件,事件的例子有如新的連線進來、資料接收等。

NIO TCP例項

首先

向buffer中寫資料用fileChannel.read()

向buffer中讀資料用fileChannel.write()

read返回值為-1的時候表示連線斷開

客戶端程式碼用NIO寫:

package NIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.concurrent.TimeUnit;
import java.util.logging.SocketHandler;

public class Client {

    public static void main(String[] args){
            client();
    }

    public static void client(){
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        SocketChannel socketChannel = null;
        try{
            socketChannel = SocketChannel.open();
            socketChannel.configureBlocking(false);
            socketChannel.connect(new InetSocketAddress("127.0.0.1",9999));
            if(socketChannel.finishConnect()){
                int i = 0;
                while(true){
                    TimeUnit.SECONDS.sleep(1);
                    String info = "I'm" + i++ + "-th information from client";
                    buffer.clear();
                    buffer.put(info.getBytes());
                    buffer.flip();//flip方法將Buffer從寫模式切換到讀模式。呼叫flip()方法會將position設回0,並將limit設定成之前position的值。
                    while(buffer.hasRemaining()){
                        System.out.println(buffer);
                        socketChannel.write(buffer);
                    }
                }
            }

        }catch (IOException | InterruptedException e){
            e.printStackTrace();
        }
        finally {
            try{
                if(socketChannel != null){
                    socketChannel.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
}

服務端程式碼先用傳統IO來寫:

package NIO;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.util.stream.IntStream;

public class Server {

    public static void main(String[] arg){
        server();
    }

    public static void server(){
        ServerSocket serverSocket = null;
        InputStream in = null;
        try{
            serverSocket = new ServerSocket(9999);
            int recvMsgSize = 0;
            byte[] recvBuf = new byte[1024];
            while (true){
                Socket clntSocket = serverSocket.accept();
                SocketAddress clientAddress = clntSocket.getRemoteSocketAddress();//獲取另一端的IP和埠
                System.out.println("Handing client at"+clientAddress);
                in = clntSocket.getInputStream();
                while((recvMsgSize = in.read(recvBuf)) != -1){
                    byte[] temp = new byte[recvMsgSize];
                    System.arraycopy(recvBuf,0,temp,0,recvMsgSize);
                    System.out.println(new String(temp));
                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }
        finally {
            try{
                if(serverSocket != null){
                    serverSocket.close();
                }
                if(in != null){
                    in.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
}

用NIO寫的服務端:

package NIO;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

public class ServerConnect {
    private static final int BUF_SIZE = 1024;
    private static final int PORT = 9999;
    private static final int TIMEOUT = 3000;

    public static void main(String[] args){
        selector();
    }

    public static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel ssChannel = (ServerSocketChannel)key.channel();
        SocketChannel sc = ssChannel.accept();
        sc.configureBlocking(false);
        sc.register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocateDirect(BUF_SIZE));//
        用selector必須要註冊
    }

    public static void handleRead(SelectionKey key)throws IOException{
        SocketChannel sc = (SocketChannel)key.channel();
        ByteBuffer buf = (ByteBuffer)key.attachment();
        long bytesRead = sc.read(buf);
        while(bytesRead > 0){
            buf.flip();
            while(buf.hasRemaining()){
                System.out.println((char)buf.get());
            }
            System.out.println();
            buf.clear();
            bytesRead = sc.read(buf);
        }
        if(bytesRead == -1){
            sc.close();
        }
    }

    public static void handleWrite(SelectionKey key)throws IOException{
        ByteBuffer buf = (ByteBuffer)key.attachment();
        buf.flip();
        SocketChannel sc = (SocketChannel)key.channel();
        while(buf.hasRemaining()){
            sc.write(buf);
        }
        buf.compact();//將所有未讀的資料拷貝到Buffer起始處。然後將position設到最後一個未讀元素正後面
    }

    public static void selector(){
        Selector selector = null;
        ServerSocketChannel ssc = null;
        try{
            selector = Selector.open();
            ssc = ServerSocketChannel.open();
            ssc.socket().bind(new InetSocketAddress(PORT));
            ssc.configureBlocking(false);
            ssc.register(selector,SelectionKey.OP_ACCEPT);
            while(true){
                if(selector.select(TIMEOUT) == 0){
                    System.out.println("==");
                    continue;
                }
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
                while(iter.hasNext()){
                    SelectionKey key = iter.next();
                    if(key.isAcceptable()){
                        handleAccept(key);
                    }
                    if(key.isReadable()){
                        handleRead(key);
                    }
                    if(key.isWritable() && key.isValid()){
                        handleWrite(key);
                    }
                    if(key.isConnectable()){
                        System.out.println("isConnectable = true");
                    }
                    iter.remove();
                }
            }
        }catch (IOException e){
            e.printStackTrace();
        }finally {
            try{
                if(selector != null){
                    selector.close();
                }
                if(ssc != null){
                    ssc.close();
                }
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
}