1. 程式人生 > >Java Socket程式設計(非阻塞多執行緒,NIO)

Java Socket程式設計(非阻塞多執行緒,NIO)

服務端:

伺服器Server類

public class Server implements Runnable {
    private int port;
    private volatile boolean stop;
    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public Server(int port){
        this.port = port;
    }

    public void init(){
        try {
            //開啟一個選擇器
            selector = Selector.open();
            //開啟一個Server-Socket監聽通道
            serverSocketChannel = ServerSocketChannel.open();
            //設定該通道為非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //繫結埠
            serverSocketChannel.socket().bind(new InetSocketAddress(port));
            //將通道註冊在選擇器上面,並將準備連線狀態作為通道訂閱時間
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

            stop = false;
            System.out.println("伺服器已經啟動,埠號:" + port);
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public void run() {
        init();
        while (!stop){
            try {
                //無論是否有讀寫事件發生,selector每隔1s被喚醒一次
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    //判斷是否準備好接收新進入的連線
                    if(selectionKey.isAcceptable()){
                        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
                        //通過ServerSocketChannel的accept()建立SocketChannel例項
                        //完成該操作意味著完成TCP三次握手,TCP物理鏈路正式建立
                        SocketChannel socketChannel = serverSocketChannel.accept();
                        //設定為非阻塞
                        socketChannel.configureBlocking(false);
                        //在選擇器註冊,並訂閱讀事件
                        socketChannel.register(selector,SelectionKey.OP_READ);
                    }
                    if(selectionKey.isReadable()){
                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                        //建立byteBuffer,並開闢一個1M的緩衝區
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        //讀取請求碼流,返回讀取到的位元組數
                        int readBytes = socketChannel.read(byteBuffer);
                        //判斷客戶端是否斷開
                        if(readBytes < 0){
                            selectionKey.cancel();
                            socketChannel.close();
                            return;
                        }
                        //讀取到位元組,對位元組進行編解碼
                        if(readBytes>0){
                            //將緩衝區從寫模式切換到讀模式
                            byteBuffer.flip();
                            //根據緩衝區可讀位元組數建立位元組陣列
                            byte[] bytes = new byte[byteBuffer.remaining()];
                            //向緩衝區讀資料到位元組陣列
                            byteBuffer.get(bytes);
                            String expression = new String(bytes,"UTF-8");
                            System.out.println("伺服器收到訊息:"+expression);
                        }
                    }
                    iterator.remove();
                }
                selectionKeys.clear();
            }catch (IOException e){
                e.printStackTrace();
            }
        }

        //selector關閉後會自動釋放裡面管理的資源
        if(selector != null){
            try {
                selector.close();
            }catch (IOException e){
                e.printStackTrace();
            }
        }
    }
}

客戶端:

客戶端Client類

public class Client implements Runnable {

    private String host;
    private int port;
    private Selector selector;
    private SocketChannel socketChannel;
    private volatile boolean stop;
    private String name;

    public Client(int port,String name){
        this("localhost",port,name);
    }

    public Client(String host,int port,String name){
        this.host = host;
        this.port = port;
        this.name = name;
    }

    public void init(){
        try {
            //開啟一個選擇器
            selector = Selector.open();
            //開啟一個Socket監聽通道
            socketChannel = SocketChannel.open();
            //設定該通道為非阻塞模式
            socketChannel.configureBlocking(false);
            //在非阻塞模式下,該方法在建立連線之前就會返回結果了,後續為了確認連線是否建立成功,可以呼叫finishConnect()
            socketChannel.connect(new InetSocketAddress(host,port));
            //訂閱連線事件
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
            stop = false;
        }catch (IOException e){
            e.printStackTrace();
        }
    }

    public void run() {
        init();
        int i = 0;
        while (!stop){
            try {
                //無論是否有讀寫事件發生,selector每隔1s被喚醒一次
                selector.select(1000);
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = selectionKeys.iterator();
                while (iterator.hasNext()){
                    SelectionKey selectionKey = iterator.next();
                    //判斷是否連線到伺服器
                    if(selectionKey.isConnectable()){
                        //判斷連線是否建立成功
                        if(socketChannel.finishConnect()){
                            sendMsg(name+" Connect Success!");
                            socketChannel.register(selector,SelectionKey.OP_WRITE);
                        }
                    }
                    if(selectionKey.isWritable()){
                        sendMsg(name+" is saying \"Hello World\"!"+i++);
                        Thread.sleep(1000);
                    }
                    iterator.remove();
                }
                selectionKeys.clear();
            }catch (ConnectException e){
                System.out.println("連線失敗!");
                return;
            }catch (IOException e){
                e.printStackTrace();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }

    public void sendMsg(String expression) throws IOException{
        byte[] bytes = expression.getBytes();
        ByteBuffer byteBuffer = ByteBuffer.allocate(bytes.length);
        byteBuffer.put(bytes);
        //翻轉緩衝區,執行的操作:
        //1.將limit的位置設為position之後的一個位置
        //2.將position的位置重置為0
        byteBuffer.flip();
        socketChannel.write(byteBuffer);
        //清空緩衝區
        byteBuffer.clear();
    }
}

測試過程:

測試類TestNio

public class TestNio {
    public static void main(String[] args) {
        Thread server = new Thread(new Server(10086));
        Thread client1 = new Thread(new Client(10086,"ONE"));
        Thread client2 = new Thread(new Client(10086,"TWO"));
        server.start();
        client1.start();
        client2.start();
    }
}

測試結果:

伺服器已經啟動,埠號:10086
伺服器收到訊息:TWO Connect Success!
伺服器收到訊息:ONE Connect Success!
伺服器收到訊息:ONE is saying "Hello World"!0
伺服器收到訊息:TWO is saying "Hello World"!0
伺服器收到訊息:ONE is saying "Hello World"!1
伺服器收到訊息:TWO is saying "Hello World"!1
伺服器收到訊息:ONE is saying "Hello World"!2
伺服器收到訊息:TWO is saying "Hello World"!2
伺服器收到訊息:ONE is saying "Hello World"!3
伺服器收到訊息:TWO is saying "Hello World"!3
伺服器收到訊息:ONE is saying "Hello World"!4
伺服器收到訊息:TWO is saying "Hello World"!4
伺服器收到訊息:ONE is saying "Hello World"!5
伺服器收到訊息:TWO is saying "Hello World"!5
伺服器收到訊息:ONE is saying "Hello World"!6
伺服器收到訊息:TWO is saying "Hello World"!6
伺服器收到訊息:ONE is saying "Hello World"!7
伺服器收到訊息:TWO is saying "Hello World"!7
伺服器收到訊息:ONE is saying "Hello World"!8
伺服器收到訊息:TWO is saying "Hello World"!8
伺服器收到訊息:ONE is saying "Hello World"!9
伺服器收到訊息:TWO is saying "Hello World"!9
伺服器收到訊息:ONE is saying "Hello World"!10
伺服器收到訊息:TWO is saying "Hello World"!10