1. 程式人生 > >netty之NIO從淺入深、詳細解析。

netty之NIO從淺入深、詳細解析。

BIO:同步、阻塞

NIO:同步、非阻塞

AIO:非同步、非阻塞

 

在學習NIO之前,需要大家先了解一些簡單的概念:

1.cache(快取記憶體)和buffer(緩衝區):

cache快取區和buffer緩衝區都是臨時儲存區。

buffer緩衝區主要存在於RAM中,作為CPU暫時儲存資料的區域(當計算機和其他裝置有不同的速度時,buffer儲存著緩衝的資料,這樣計算機就可以做其他應用了)

cache快取區是一種高速儲存區域,可以在主存或硬碟燈其他獨立儲存區域的一部分。

總結:cache快取記憶體為硬碟快取,buffer緩衝區被稱為記憶體快取。

 

2.阻塞(Block)和非阻塞(Non-Block):

阻塞和非阻塞是程序在訪問資料的時候,資料是否準備就緒的一種處理方式,當資料沒有準備的時候

阻塞:往往需要等待緩衝區中的資料準備好過後才處理其他的事情,否則一直等待在哪裡。

非阻塞:當我們的程序訪問我們的資料緩衝區的時候,如果資料沒有準備好則直接返回,不會等待。如果資料已經準備好,也直接放回。

 

3.同步(Synchronization)和非同步(Asynchronous)的方式:

同步和非同步都是基於應用程式和作業系統處理IO事件所採用的方式。

比如同步:是應用程式要直接參與IO讀寫的操作。

非同步:所有的IO讀寫交給作業系統去處理,應用程式只需要等待通知。

同步方式在處理IO事件的時候,必須阻塞在某個方法上面等待我們的IO事件完成(阻塞IO事件或者通過輪詢IO事件的方式),對於一部來說,所有的IO讀寫都交給了作業系統。這個時候,我們可以去做其他的事情,並不需要去完成真正的IO操作,當操作完成IO後會給我們的應用程式一個通知。

同步:阻塞到IO事件,阻塞到read或者write。這個時候我們就完全不能做自己的事情。讓讀寫方法加入到執行緒裡面,然後阻塞執行緒來實現,對執行緒的效能開銷比較大。

4.BIO(Block IO)於NIO(Non-Block IO)對比:

IO模型

IO

NIO

方式

從硬碟到記憶體

從記憶體到硬碟

通訊

面向流

面向緩衝

處理

阻塞IO(多執行緒)

非阻塞IO(反應堆Reactor)

觸發

選擇器(輪詢機制)

 

5.面向流與面向緩衝:

Java NIO 和 IO 之間第一個最大區別是,IO是面向流的,NIO是面向緩衝區的。Java IO 面向流意味著每次從流中讀取一個或多個位元組,直至讀取所有位元組,他們沒有被快取在任何地方。此外,它不能前後移動流中的資料。如果需要前後移動從流中讀取的資料,需要先將它快取到一個緩衝區。Java NIO 的緩衝導向方法略有不同。資料讀取到一個它稍後處理的緩衝區,需要時可在緩衝區中前後移動。這就增加了處理過程中的靈活性。但是,還需要檢查是否該緩衝區中包含所有您需要處理的資料。而且,需確保當更多的資料讀入緩衝區時,不要覆蓋緩衝區裡尚未處理的資料。

 

6.socket(套接字):Socket clinet = new Socket("localhost",8080)

網路上兩個程式通過一個雙向的通訊連線實現資料的交換,這個連線的一端稱為一個socket。

TCP/IP協議幫我們解決能夠實現兩臺主機之間的程序通訊,網路層的IP地址可以唯一標識網路中的主機,傳輸層的協議+埠可以唯一標識主機中的應用程式(程序),這樣利用三元組(IP地址、協議、埠)就可以標識網路的程序。

NIO各沒款詳細解析

一、深入剖析 Buffer

在談到緩衝區時,我們說緩衝區物件本質上是一個數組,但它其實是一個特殊的陣列,緩衝區物件內建了一些機制,

能夠跟蹤和記錄緩衝區的狀態變化情況,如果我們使用get()方法從緩衝區獲取資料後者使用put()方法把資料寫入緩

衝區,都會引起緩衝區狀態的變化。

在緩衝區中,最重要的屬性有下面三個,它們一起合作完成對緩衝區內部狀態的變化跟蹤:

 

position:指定了下一個將要被寫入或者讀取的元素索引,它的值由get()/put()方法自動更新,在新建立一個Buffer

物件時,position被初始化為0

capacity 陣列容量

limit 可操作範圍

position 標記/遊標 get和put時候到哪個位置 相當於陣列索引 index

flip() 把當前緩衝區固定

二、緩衝區的分配

1、分配指定帶下的緩衝區

2、包裝一個現有的陣列 ByteBuffer.wrap(new byte[10])

3、緩衝區的切片 Slice 0<position<limit<capacity 在position和limit之間分成不同的子緩衝區

4、只讀緩衝區 buffer.asReadOnlyBuffer() 這個緩衝區的資料不能put()只能get()

5、直接緩衝區 ByteBuffer.allocateDirect(1024) Java程式再怎麼分配都是在JVM允許範圍內分配(堆疊區),

類似C語言,直接呼叫C語言模組,最底層。從作業系統中直接分配記憶體,netty中的零拷貝。

6、記憶體對映檔案I/O,在記憶體中可讀可寫,直接持久化硬碟

三、通道 Channel

通道是一個物件,通過它可以讀取和寫入資料,當然了所有資料都通過Buffer物件來處理。我們永遠不會將位元組直接

寫入通道中,相反是將資料寫入包含一個或者多個位元組的緩衝區。同樣不會直接從通道中讀取位元組,而是將資料從通

道讀入緩衝區,再從緩衝區獲取這個位元組。在NIO中,提供了多種通道物件,而所有的通道物件都實現了Channel介面。

 

使用NIO讀取資料

在前面我們說過,任何時候讀取資料,都不是直接從通道讀取,而是從通道讀取到緩衝區。

所以使用NIO讀取資料分為以下三個步驟:

1.從FileInputStream獲取Channel

2.建立Buffer

3.將資料從Channel讀取到Buffer中

 

四、反應堆 Reactor

 

1、阻塞I/O通訊模型

加入現在你對阻塞I/O有一定的瞭解,我們知道阻塞I/O在呼叫InputStream.read()方法時是阻塞的,它會一直

等到資料到來時(或超時)才會返回;同樣,在呼叫ServerSocket.accept()方法時,也會一直阻塞到有客戶端

連線才會返回,每個客戶端連線過來後,服務端都會啟動一個執行緒去處理該客戶端的請求。

缺點:

a.當客戶端多時,會建立大量的處理執行緒。而且每個執行緒都要佔用棧空間和cpu時間

b.阻塞可能帶來頻繁的上下文切換,而且大部分上下文切換是可能無意義的

2、Java NIO原理及通訊模型

Java NIO是在jdk 1.4開始使用的,稱為新的I/O或者是非阻塞式I/O

工作原理:

a.由一個專門的執行緒來處理所有的IO時間,並負責分發。

b.事件驅動機制:事件到的時候觸發,而不是同步的去監視事件

c.執行緒通訊:執行緒之間通過wait,notify等方式通訊,保證每次上下文切換都是有意義的,減少不必要的執行緒切換

(每個執行緒的處理流程大概都是讀取資料、解碼、計算處理、編碼、傳送響應)

五、選擇器 Selector

傳統的Server/Clinet模式會基於trp(Thread pre Request),伺服器會為每個客戶端請求建立一個執行緒,由該執行緒單獨

負責處理一個客戶端請求。這種模式帶來的一個問題是執行緒資料的劇增,大量的執行緒會增大伺服器的開銷。大多數的實現

為了避免這個問題,都採用了執行緒池模型,並設定執行緒池執行緒的最大數量,這又帶來了新的問題,如果執行緒池中有200個線

程,而又200個使用者都在進行大檔案下載,會導致201個使用者的請求無法及時處理,即便第201個使用者只想請求一個幾KB大小的頁面。

NIO中非阻塞I/O採用了基於Reactor模式的工作方式,I/O呼叫不會被阻塞,相反是註冊感興趣的特定I/O事件,如可讀資料

到達,新的套接字連線等等,在發生特定事件時,系統再通知我們。NIO中實現非阻塞I/O的核心物件就是Selector,Selector

就是註冊各種I/O事件池,而且當那些事情發生時,就是這個物件告訴我們所發生的事件。

NIOServer程式碼:

package com.sinosoft.study.nio.demo;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class NIOServer {

    private int port = 8080;
    private Selector selector;

    public NIOServer(int port){
        try{
            this.port = port;
            // ServerSocketChannel是一個可以監聽新進來的TCP連線的通道,就像標準IO中的ServerSocket一樣
            // 獲得一個ServerSocket通道
            ServerSocketChannel server = ServerSocketChannel.open();

            // 預設為阻塞,手動改為非阻塞
            server.configureBlocking(false);

            // 將該通道對於的 ServerSocket 繫結到 port埠號
            server.socket().bind(new InetSocketAddress(port));

            // 獲得一個通道選擇器
            selector = Selector.open();
            /*
             * 將通道選擇器和該通道繫結,併為該通道註冊selectionKey.OP_ACCEPT事件
             * 註冊該事件後,當事件到達的時候,selector.select()會返回,
             * 如果事件沒有到達selector.select()會一直阻塞
             */
            server.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("伺服器準備就緒,監聽埠是:"+ this.port);

        }catch(Exception e){
            e.printStackTrace();
        }
    }

    public void listen(){
        try {
            System.out.println("start server");
            // 輪詢
            while (true) {
                // 當註冊事件到達時,方法返回,否則該方法會一直阻塞
                int wait = this.selector.select();
                if(wait == 0){ continue;}

                // 獲得selector中選中的迭代器,選中註冊的事件
                Set<SelectionKey> keys = this.selector.selectedKeys();
                Iterator<SelectionKey> i = keys.iterator();
                while(i.hasNext()){
                    SelectionKey key = i.next();

                    process(key);
                    i.remove();
                }
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public void process(SelectionKey key) throws Exception {

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        // 客戶端請求連線事件
        if(key.isAcceptable()){
            ServerSocketChannel server = (ServerSocketChannel) key.channel();
            // 獲得和客戶端連線的通道
            SocketChannel client = server.accept();
            client.configureBlocking(false);
            // 在客戶端連線成功之後,為了可以接收到客戶端的資訊,需要給通道設定讀的許可權
            client.register(selector,SelectionKey.OP_READ);
        }else if(key.isReadable()){
            SocketChannel client = (SocketChannel) key.channel();
            int len = client.read(buffer);
            if(len > 0){
                buffer.flip();
                String content = new String(buffer.array(),0,len);
                System.out.println(content);
                client.register(selector,SelectionKey.OP_WRITE);
            }
            buffer.clear();

        }else if(key.isWritable()){
            SocketChannel client = (SocketChannel) key.channel();
            buffer = buffer.wrap("Hello Wold".getBytes());
            client.read(buffer);

        }
    }

    public static void main(String[] args) {
        new NIOServer(8080).listen();
    }

}

BIO客戶端程式碼:

package com.sinosoft.study.bio;

import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
import java.util.UUID;

public class BIOClient {
    public static void main(String[] args) throws UnknownHostException, IOException{
        // 獲取一個Socket通道
        Socket client = new Socket("localhost",8080);
        // 輸入流通道開啟
        OutputStream os = client.getOutputStream();

        String uuid = UUID.randomUUID().toString();
        System.out.println("要輸出的內容為:" + uuid);
        os.write(uuid.getBytes());
        os.close();
        client.close();
    }
}

BIO服務端程式碼:

package com.sinosoft.study.bio;

import java.io.IOException;
import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;

public class BIOServer {
    ServerSocket server;
    // 伺服器
    public BIOServer(int port){
        try{
            // 把Socket服務端啟動
            server = new ServerSocket(port);
            System.out.println("BIO服務已啟動,監聽的埠號是:" + port);
        }catch(IOException e){
            e.printStackTrace();
        }
    }
    /**
      * @Description: 開始監聽,並處理邏輯
      * @Param: 
      * @return: 
      * @Author: Mr.WangYu
      * @Date: 2018/11/11
      * @Time: 14:29
      */
    public void listener() throws IOException{
        // 迴圈監聽
        while(true){
            // 等待客戶端的連線,阻塞方法
            Socket client = server.accept();

            InputStream is = client.getInputStream();
            byte []b = new byte[1024];
            int num = is.read(b);
            if (num > 0){
                String msg = new String(b,0,num);
                System.out.println("收到:" + msg);
            }
        }
    }

    public static void main(String[] args) throws IOException{
        new BIOServer(8080).listener();
    }
}

贈品:AIO服務端程式碼

package com.sinosoft.study.aio;

import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOServer {

    private int port = 8080;


    public AIOServer(int port){
        this.port = port;
    }
    public void listen(){
        try {
            AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open();
            server.bind(new InetSocketAddress(this.port));
            server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {

                @Override
                public void completed(AsynchronousSocketChannel client, Object attachment) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    client.read(buffer);
                    buffer.flip();
                    System.out.println("111111111111");
                    System.out.println(new String(buffer.array()));
                }

                @Override
                public void failed(Throwable exc, Object attachment) {

                }
            });
        }catch (Exception e){
            e.printStackTrace();
        }
        try {
            Thread.sleep(Integer.MAX_VALUE);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        new AIOServer(8080).listen();
    }
}