1. 程式人生 > >NIO實現TCP的非阻塞通訊

NIO實現TCP的非阻塞通訊

這一次寫NIO實現非阻塞通訊時遇到了很多問題,我所理解的非阻塞是對於一個使用者而言它的讀寫不會相互制約,而在此次編寫過程中,發現其實非阻塞是相對於多個使用者而言的。
看到網上一個對同步非同步阻塞非阻塞的例子,感覺挺好的,就拷過來了:

老張愛喝茶,廢話不說,煮開水。
出場人物:老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。
1 老張把水壺放到火上,立等水開。(同步阻塞)
老張覺得自己有點傻
2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)
老張還是覺得自己有點傻,於是變高端了,買了把會響笛的那種水壺。水開之後,能大聲發出嘀~~~~的噪音。
3
老張把響水壺放到火上,立等水開。(非同步阻塞) 老張覺得這樣傻等意義不大 4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(非同步非阻塞) 老張覺得自己聰明瞭。 所謂同步非同步,只是對於水壺而言。 普通水壺,同步;響水壺,非同步。 雖然都能幹活,但響水壺可以在自己完工之後,提示老張水開了。這是普通水壺所不能及的。 同步只能讓呼叫者去輪詢自己(情況2中),造成老張效率的低下。 所謂阻塞非阻塞,僅僅對於老張而言。 立等的老張,阻塞;看電視的老張,非阻塞。 情況1和情況3中老張就是阻塞的,媳婦喊他都不知道。雖然3中響水壺是非同步的,可對於立等的老張沒有太大的意義。所以一般非同步是配合非阻塞使用的,這樣才能發揮非同步的效用。 ——來源網路,作者不明。 作者:愚抄 連結:https://www.zhihu
.com/question/19732473/answer/23434554 來源:知乎 著作權歸作者所有,轉載請聯絡作者獲得授權。

使用的類:
1.ServerSocketChannel類:ServerSocket的代替類,支援非阻塞通訊;
2.SocketChannel類:Socket的代替類,支援非阻塞通訊;
3.Selector:為ServerSocketChannel監控接收連線就緒事件,為SocketChannel監控連線就緒,讀就緒和寫就緒事件。
4.SelectionKey:代表ServerSocketChannel和SocketChannel向Selector註冊事件的控制代碼。

基本思路:
伺服器端:
1.使用ServerSocketChannel類對地址埠號進行繫結,通過open()方法獲取通道,並且設定為非阻塞;
2.通過Selector的open()方法,建立Selector物件;
3.進行註冊連線就緒事件;
3.通過Selector的select()方法返回註冊事件的數目,返回物件為SelectionKey,對SelectionKey進行遍歷,當監聽到連線事件的時候,就使用accept()連線並返回一個SocketChannel物件,對它註冊讀就緒事件;
4.收到訊息之後進行反饋;
客戶端:
1.通過IP和埠號進行連線;
2.通過Selector的open()方法建立該類物件;
3.進行註冊讀事件監聽,讀取伺服器端的反饋;
4.輸出訊息;

程式碼:
伺服器端:

package sency.lay.two;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 伺服器端
 * 
 * @author sency
 * 
 */
public class NServer {
    private ServerSocketChannel serverSocketChannel;
    private Selector selector;
    private int port = 8000;
    // 用於編碼解碼物件
    private Charset charse = Charset.forName("GBK");
    public static SelectionKey key = null;


    public NServer() throws IOException {
        // 建立Selector物件
        selector = Selector.open();
        // 開啟一個伺服器端通道
        serverSocketChannel = ServerSocketChannel.open();
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
        serverSocketChannel.bind(isa);
        // 設定為非阻塞模式
        serverSocketChannel.configureBlocking(false);
        // 為伺服器通道註冊連線就緒監聽事件
        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("=======Welcome!!!=======");
    }

    public static void main(String args[]) throws IOException {
        new NServer().service();
    }

    private void service() throws IOException {
        // TODO Auto-generated method stub
        // 通過select方法選擇一組鍵,其相應的通道已為 I/O操作準備就緒
        while (selector.select() > 0) {
            Set<SelectionKey> keys = selector.selectedKeys();
            Iterator<SelectionKey> it = keys.iterator();
            while (it.hasNext()) {

                key = it.next();
                // 刪除正在處理的key
                it.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = serverSocketChannel.accept();
                    if (sc.isConnected()) {
                        System.out.println("連線到:"
                                + sc.socket().getInetAddress() + "--"
                                + sc.socket().getPort());
                    }
                    // 設定為非阻塞模式
                    sc.configureBlocking(false);
                    // 為返回的SocketChannel物件註冊讀就緒和寫就緒監聽
                    sc.register(selector, SelectionKey.OP_READ);
                }

                if (key.isReadable()) {
                    try {
                        readMsg(key);
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }

                }

            }
        }
    }

    private void sendEcho(SelectionKey key,String content) throws IOException {
        if(content.length()>0){
            SocketChannel sc = (SocketChannel) key.channel();
            String echo = "#ServerEcho:"+content+"收到!";
            sc.write(charse.encode(echo));
            System.out.println(echo);
        }
    }

    private void readMsg(SelectionKey key) throws IOException {
        // TODO Auto-generated method stub
        // 獲取該key所對應的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 建立一個ByteBuffer用於存放讀取的資料
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String content = "";
        try {
            while (socketChannel.read(buffer) > 0) {
                // 將極限設定為當前位置,再將當前位置設定為0
                buffer.flip();
                // 對讀取取內容進行解碼
                content += charse.decode(buffer);
            }
            System.out.println("#Client:" + content);
            // 將key對應的Channel設定為準備下一次讀取
            key.interestOps(SelectionKey.OP_READ);
            buffer.clear();
        } catch (IOException e) {
            key.cancel();
            if (key.channel() != null) {
                key.channel().close();
            }
        }
        sendEcho(key,content);
    }
}

客戶端:

package sency.lay.two;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;

/**
 * 客戶端
 * 
 * @author sency
 * 
 */
public class NClient {
    private SocketChannel socketChannel;
    private Selector selector;
    private int port = 8000;
    private Charset charse = Charset.forName("GBK");
    public SelectionKey key = null;

    public NClient() throws IOException {
        selector = Selector.open();
        // 建立SocketChannel
        InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
        socketChannel = SocketChannel.open();
        socketChannel.connect(isa);
        socketChannel.configureBlocking(false);
        socketChannel.register(selector, SelectionKey.OP_READ);
    }

    public static void main(String args[]) throws IOException {
        System.out.println("=======客戶端=======");

        new NClient().talk();

    }

    private void talk() throws IOException {
          Scanner scan=new Scanner(System.in);
          new MyThread().start();
          while(scan.hasNextLine()){  
              //讀取鍵盤的輸入  
              String line=scan.nextLine();  
              //將鍵盤的內容輸出到SocketChanenel中  
              socketChannel.write(charse.encode(line));  
            System.out.println("#Client:"+line);
          }  


    }

    class MyThread extends Thread{

        public void run() {
            // TODO Auto-generated method stub
            try {
                while (selector.select() > 0) {

                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    while (it.hasNext()) {

                        key = it.next();
                        it.remove();

                        // TODO Auto-generated method stub
                        if (key.isReadable()) {
                            try {
                                receiveMsg(key);
                            } catch (IOException e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                        }

                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

        }

    }
    private void receiveMsg(SelectionKey key) throws IOException {
        // TODO Auto-generated method stub
        // 獲取該key所對應的SocketChannel
        SocketChannel socketChannel = (SocketChannel) key.channel();
        // 建立一個ByteBuffer用於存放讀取的資料
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        String content = "";
        try {
            while (socketChannel.read(buffer) > 0) {
                // 將極限設定為當前位置,再將當前位置設定為0
                buffer.flip();
                // 對讀取取內容進行解碼
                content += charse.decode(buffer);
            }
            System.out.println(content);
            // 將key對應的Channel設定為準備下一次讀取
            key.interestOps(SelectionKey.OP_READ);

        } catch (IOException e) {
            if (key != null) {
                key.cancel();
                key.channel().close();
            }
        }
    }
}

水平太低啦,大家不要介意,努力學習ing!