1. 程式人生 > >NIO(三):Selector選擇器

NIO(三):Selector選擇器

一.堵塞式與非堵塞式

在傳統IO中,將資料由當前執行緒從客戶端傳入服務端,由服務端的核心進行判斷傳過來的資料是否合法,核心中是否存在資料。

 

如果不存在資料 ,並且資料並不合法,當前執行緒將會堵塞等待。當前執行緒將無法進行下一步傳輸,進行排隊現象。降低系統性能。

為了解決這一步問題,呼叫資源開闢多個執行緒傳輸。

 

 雖然執行緒的開闢解決了部分堵塞排隊的問題,但由於並沒有治理根本堵塞的原因,執行緒數量也是有限的。總會有堵塞的執行緒 ,形成排隊現象。

為了根本解決堵塞的問題。NIO的非堵塞式成為了主要的傳輸方式。

在客戶端和服務端之間將通道註冊到selector選擇器,由選擇器進行監聽channel是否進行什麼操作(read()or write())。

 

當資料就緒或者準備完成時,由selector進行分配到服務端的一個(或多個)執行緒上進行相關執行操作。

 

 在IO的堵塞後無腦呼叫執行緒下。NIO是在準備完成時,才被selector選擇分配到一個或者多個執行緒上傳輸並被複制到核心地址空間中,由於資料已準備完成或者已就緒,核心就無須被堵塞。

 

 

二.Selector(選擇器)

也稱多路複用器,多條channel複用selector。channe通過註冊到selector ,使selector對channel進行監聽,

  實現儘可能少的執行緒管理多個連線。減少了 執行緒的使用,降低了因為執行緒的切換引起的不必要額資源浪費和多餘的開銷。

  也是網路傳輸非堵塞的核心元件。

  

三.Selector的使用

分為客戶端和服務端兩部分:

先實現客戶端吧:

  流程: 獲取通道繫結主機埠 --> 切換非堵塞狀態  --> 開闢buffer容量  -->  將當前時間作為資料寫入buffer待傳  --> 切換讀寫方式flip()  --> 寫入通道 -->清空並關閉

 1  /*
 2     * 客戶端傳送資料 通過channel通道
 3     * */
 4     @Test
 5     public void Client() throws IOException {
 6 
 7         //獲取channel通道   並設定主機號和埠號
 8         SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));
 9 
10         //因為使用非阻塞NIO  所以必須切換為非阻塞
11         socketChannel.configureBlocking(false);   //預設為true 需要改為非堵塞的
12 
13         //開闢緩衝區進行儲存資料
14         ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
15 
16         //準備工作就緒後,準備傳送資料給服務端
17         //列印當前日期轉為Byte資料傳出
18         byteBuffer.put(new Date().toString().getBytes());
19         //切換讀寫模式
20         byteBuffer.flip();
21         //寫入通道
22         socketChannel.write(byteBuffer);
23         //完畢時,清除緩衝區內容
24         byteBuffer.clear();
25 
26     //====================
27         //關閉相關流
28         socketChannel.close();
29 
30     }

 

在獲取當前時間是用的new Date();還可以使用java8的獲取時間的方法。

LocalDateTime.now().toString().getBytes()  //轉為Byte位元組

 

 因為是網路傳輸的心形式,所以在獲取channel時,使用SocketChannel.open方法。實現方法:

 1   public static SocketChannel open(SocketAddress remote)
 2         throws IOException
 3     {
 4         SocketChannel sc = open();
 5         try {
 6             sc.connect(remote);   //開啟一個新的channel時,繫結連線到主機和埠上
 7         } catch (Throwable x) {
 8             try {
 9                 sc.close();  //異常時關閉連線
10             } catch (Throwable suppressed) {
11                 x.addSuppressed(suppressed);
12             }
13             throw x;
14         }
15         assert sc.isConnected();
16         return sc;
17     }

 

 new InetSocketAddress例項建立主機和埠。

   */
    public InetSocketAddress(String hostname, int port) {
        checkHost(hostname);    //檢查主機號是否為空 為空返回異常。
        InetAddress addr = null;
        String host = null;
        try {
            addr = InetAddress.getByName(hostname);
        } catch(UnknownHostException e) {
            host = hostname;
        }
        holder = new InetSocketAddressHolder(host, addr, checkPort(port));  //檢查埠。
    }


//檢查埠方法  
private static int checkPort(int port) {
if (port < 0 || port > 0xFFFF)
throw new IllegalArgumentException("port out of range:" + port);
return port;
}

//檢查主機號方法
private static String checkHost(String hostname) {
if (hostname == null)
throw new IllegalArgumentException("hostname can't be null");
return hostname;
}
 

 

 

服務端:

  流程:使用ServerSocketChannel 的方法獲取服務端額channel  --> 切換為堵塞狀態 --> 為buffer分配容量 --> 繫結埠號 --> 獲取selector選擇器 --> channel註冊進選擇器中,並進行監聽 -->  選擇器進行輪詢,進行下一步讀寫操作。

 1  /*
 2     * 服務端接收客戶端傳來的資料
 3     * */
 4     @Test
 5     public void server() throws IOException {
 6 
 7         //獲取channel通道
 8         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
 9         //切換為非堵塞狀態
10         serverSocketChannel.configureBlocking(false);
11         //分配服務端的緩衝區
12         ByteBuffer serverByteBuffer = ByteBuffer.allocate(1024);
13         //將客戶端的InetSocketAddress繫結到通道,不繫結 不統一將獲取不到資料
14         serverSocketChannel.bind(new InetSocketAddress(8080));
15         //獲取選擇器
16         Selector selector = Selector.open();
17         //將通道註冊到選擇器中,並且制定監聽方式
18         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
19         //進行輪詢選擇器上就緒成功的事件  當存在就緒成功的及進行下一步
20         while (selector.select() > 0){
21             //對已存在的就緒事件進行迭代
22             Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();
23 
24             //有元素就進行下一步
25             while (selectionKeyIterator.hasNext()){
26                 //獲取到就緒事件
27                 SelectionKey next = selectionKeyIterator.next();
28 
29                 //對獲取到的就緒事件判斷是何種型別
30                 if (next.isAcceptable()){
31 
32                     //獲取連線
33                     SocketChannel accept = serverSocketChannel.accept();
34 
35                     //將獲取到的連線切換為非堵塞模式
36                     accept.configureBlocking(false);
37 
38                     //將獲取到的連結 註冊金selector
39                     accept.register(selector,SelectionKey.OP_READ);
40 
41                     //判斷是否準備好讀
42                 }else if (next.isReadable()){
43 
44                     //獲取已就緒的通道
45                     SocketChannel channel = (SocketChannel) next.channel();
46 
47                     //分配緩衝區
48                     ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
49 
50                     //讀取資料
51                     int length = 0 ;
52                     while ((length = channel.read(byteBuffer)) > 0){
53                         byteBuffer.flip();
54                         System.out.println(new String(byteBuffer.array(),0,length));
55                         byteBuffer.clear();
56                     }
57 
58 
59                 }
60 
61                 //完成傳輸需要取消選擇鍵,防止下次出問題
62                 selectionKeyIterator.remove();
63 
64             }
65         }
66 
67 
68     }

 

如何獲取選擇器?

Selector selector = Selector.open();

 

 

實現過程:

 public static Selector open() throws IOException {
        return SelectorProvider.provider().openSelector();
    }



//首先進入此方法判斷是否存在選擇器
 public static SelectorProvider provider() {
        synchronized (lock) {
            if (provider != null)  //第一次為false
                return provider;
            return AccessController.doPrivileged(
                new PrivilegedAction<SelectorProvider>() {
                    public SelectorProvider run() {
                            if (loadProviderFromProperty())
                                return provider;
                            if (loadProviderAsService())
                                return provider;
                            provider = sun.nio.ch.DefaultSelectorProvider.create();
                            return provider;
                        }
                    });
        }
    }


//false時 跳入如下方法。
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
 

 

隨後將獲取到的通道註冊到獲取到的選擇器中,在註冊時給定監聽方式:

 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);  //可多選監聽操作項

selectionKey中定義了四個可操作項:

  • OP_READ  可讀就緒

  • OP_WRITE  可寫就緒

  • OP_CONNECT  連線就緒

  • OP_ACCEPT  接收就緒

 

迭代key中已就緒的元素。

Iterator<SelectionKey> selectionKeyIterator = selector.selectedKeys().iterator();

 

獲取到當前就緒事件叢迭代器中獲取。

selectionKeyIterator.next()

 

 

selectionKey包含四個方法:

  • isReadable():測試此選擇鍵是否可讀  

  • isWritable():測試此選擇鍵是否可寫

  • isConnectable():測試此選擇鍵是否完成

  • isAcceptable():測試此選擇鍵是否可以接受一個新的連線

 通過這些相應的方法,單獨判斷是否可以讀寫,和進行操作。

 

最後取消選擇鍵,防止下次獲取出現異常情況。(第一次判斷可能會為true)

selectionKeyIterator.remove();

 

 

四.附加

在上面的例子中,把客戶端的程式碼進行稍微改寫一下,使之能夠無限輸入,並通過傳輸列印在服務端中。

public static void main(String[] args) throws IOException {
        //獲取channel通道   並設定主機號和埠號
        SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1",8080));

        //因為使用非阻塞NIO  所以必須切換為非阻塞
        socketChannel.configureBlocking(false);

        //開闢緩衝區進行儲存資料
        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);

        //附加輸入:
        Scanner scanner = new Scanner(System.in);
        //通過控制檯鍵入資料
        while (scanner.hasNext()){
            String str = scanner.next();
            //準備工作就緒後,準備傳送資料給服務端
            //列印當前日期轉為Byte資料傳出
            byteBuffer.put((new Date().toString()+":--->"+str).getBytes());
            //切換讀寫模式
            byteBuffer.flip();
            //寫入通道
            socketChannel.write(byteBuffer);
            //完畢時,清除緩衝區內容
            byteBuffer.clear();
        }

    }

 

由於掃描流(scanner)不能用於測試類,所以在main方法下進行測試:

每次輸入的內容都會被轉為Byte位元組進行傳輸。

客戶端輸入結果:

 

 服務端輸出結果:

 

 每輸入一次便傳輸一次。

//完成傳輸需要取消選擇鍵,防止下次出問題
selectionKeyIterator.remove(