1. 程式人生 > >【Java IO模式】Java BIO NIO AIO總結

【Java IO模式】Java BIO NIO AIO總結

一、同步與非同步、阻塞與非阻塞

1、同步與非同步

同步與非同步的區別在於,資料從核心空間拷貝到使用者空間是否由使用者執行緒完成。
– 對於同步來說,分阻塞和非阻塞兩種。阻塞的情況,一個執行緒維護一個連結,該執行緒完成資料的讀寫與處理的全部過程,並且資料的讀寫是阻塞的。 對於非阻塞來說,雖然讀寫的過程不會阻塞當前執行緒,立即返回,但是使用者執行緒(Selector選擇器)仍然要不斷主動去判斷資料是否“就緒”(感興趣的事件是否發生,具體可參考後文對NIO的描述),當出現可以操作的IO時,進行資料的讀寫並處理,此時還是會阻塞等待核心複製資料到使用者程序。他與同步BIO的區別是後者使用一個連線全程等待;
可參考下圖(同步非阻塞):
這裡寫圖片描述

可以看到,在將資料從核心拷貝到使用者空間這一過程,是由使用者執行緒阻塞完成的。

–對於非同步來說,使用者進行讀或者寫後,將立刻返回,由核心去完成資料讀取以及拷貝工作,完成後通知使用者,並執行回撥函式(使用者提供的callback),此時資料已從核心拷貝到使用者空間,使用者執行緒只需要對資料進行處理即可,不需要關注讀寫,使用者不需要等待核心對資料的複製操作,使用者在得到通知時資料已經被複制到使用者空間
可參考下圖(非同步非阻塞):
這裡寫圖片描述

可發現,使用者在呼叫之後,立即返回,由核心完成資料的拷貝工作,並通知使用者執行緒,進行回撥。

(IO“就緒”和“完成”的區別:就緒指的是還需要使用者自己去處理,完成指的是核心幫助完成了,使用者不用關心IO過程,只需要提供回撥函式。)
(一般來說,IO操作都分為兩個階段,就拿套介面的輸入操作來說,它的兩個階段主要是:1)等待網路資料到來,當分組到來時,將其拷貝到核心空間的臨時緩衝區中2)將核心空間臨時緩衝區中的資料拷貝到使用者空間緩衝區中)

(網上還有一種對非同步同步的解釋也很形象:同步和非同步關注的是訊息通訊機制synchronous communication/ asynchronous communication。所謂同步,就是在發出一個呼叫時,在沒有得到結果之前,該呼叫就不返回。但是一旦呼叫返回,就得到返回值了。換句話說,就是由呼叫者主動等待這個呼叫的結果。而非同步則是相反,呼叫在發出之後,這個呼叫就直接返回了,所以沒有返回結果。換句話說,當一個非同步過程呼叫發出後,呼叫者不會立刻得到結果。而是在呼叫發出後,被呼叫者通過狀態、通知來通知呼叫者,或通過回撥函式處理這個呼叫。)

2、阻塞與非阻塞

阻塞與非阻塞IO,指的是在IO讀寫過程中有沒有阻塞執行緒,即當沒有可讀資料時,方法是否即刻返回。
例如在BIO中,使用流的方式進行讀寫操作,而流的讀寫操作是阻塞的,例如inpustrem.readline()函式,當沒有有效資料可讀時,執行緒將阻塞在該語句處。寫操作是同樣的道理。如下圖所示:
這裡寫圖片描述


而在NIO中,使用channel與buffer的方式進行資料的讀寫,只有在有可讀或者可寫資料時,才會將資料從核心空間讀/寫入使用者空間緩衝區,不會造成執行緒的阻塞。具體可參考後文對BIO、NIO的描述。如下圖所示:
這裡寫圖片描述

3、IO模式

在Java中,使用socket進行網路通訊,IO有如下模式:BIO、NIO、AIO。
分別代表著:同步阻塞IO、同步非阻塞IO、非同步非阻塞IO

二、BIO

1 概念描述

指阻塞式IO通訊模式。如下圖所示為BIO模式示意圖:
這裡寫圖片描述
每建立一個Socket連線時,同時建立一個新執行緒對該Socket進行單獨通訊(採用阻塞的方式通訊)。這種方式具有很高的響應速度,並且控制起來也很簡單,在連線數較少的時候非常有效,但是如果對每一個連線都產生一個執行緒的無疑是對系統資源的一種浪費,如果連線數較多將會出現資源不足的情況。

2 特點

他有以下兩個特點:
(a)使用一個獨立的執行緒維護一個socket連線,隨著連線數量的增多,對虛擬機器造成一定壓力;
(b)使用流來讀取資料,流是阻塞的,當沒有可讀/可寫資料時,執行緒等待,會造成資源的浪費;

3 程式碼實現

我們使用Socket程式設計的方式,用程式碼來體會一下BIO模式通訊方式
服務端監聽執行緒:

package study20170324;

/**
 * Created by apple on 17/3/24.
 */

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * 開啟服務監聽執行緒,當收到連線請求後,開啟新的執行緒進行處理
 */
public class ServerThread implements Runnable{

    @Override
    public void run() {
        try {
            ServerSocket serverSocket = new ServerSocket(Constant.PORT);
            while (true){
                Socket socket = serverSocket.accept();
                new Thread(new ServerProcessThread(socket)).start();//開啟新的執行緒進行連線請求的處理
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

服務端資料處理執行緒:

package study20170324;

/**
 * Created by apple on 17/3/24.
 */

import java.io.*;
import java.net.Socket;

/**
 * 服務端收到連線請求後,處理請求的執行緒,阻塞式IO
 */
public class ServerProcessThread implements Runnable {

    private Socket socket;
    public ServerProcessThread(Socket socket){
        this.socket = socket;
    }
    @Override
    public void run() {

        //獲取客戶端的資料,並寫回
        //等待響應

        try {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String line = "";
            String requestStr = "";
            System.out.println("來自客戶端的資料:");
            while((line = bufferedReader.readLine()) != null){
                requestStr += line;
                System.out.println(line);
            }
            Writer writer = new OutputStreamWriter(socket.getOutputStream());
            writer.write("data from server " + requestStr + "\r\n");
            writer.flush();
            writer.close();
            bufferedReader.close();
            socket.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端處理執行緒

package study20170324;

import java.io.*;
import java.net.Socket;

/**
 * Created by apple on 17/3/24.
 */

/**
 * 維護客戶端socket連線的執行緒,阻塞式IO
 */
public class ClientProcessThread implements Runnable {

    private Socket socket;
    public ClientProcessThread(Socket socket){
        this.socket = socket;
    }

    @Override
    public void run() {
        //寫資料,等待響應,輸出響應

        String requestStr = "data from client \r\n";
        try {
            Writer writer = new OutputStreamWriter(socket.getOutputStream());
            writer.write(requestStr);
            writer.flush();
            socket.shutdownOutput();
            //等待響應
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            String line;
            System.out.println("來自服務端的響應:");
            while((line = bufferedReader.readLine()) != null){
                System.out.println(line);
            }
            writer.close();
            bufferedReader.close();
            socket.close();


        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

常量類:

package study20170324;

/**
 * Created by apple on 17/3/24.
 */
public class Constant {
    public static final String HOST = "127.0.0.1";
    public static final int PORT = 8080;
}

主執行類:

package study20170324;

import java.io.IOException;
import java.net.Socket;

/**
 * Created by apple on 17/3/24.
 */
public class ClientMain {

    public static void main(String[] args) {
        //開啟服務
        System.out.println("開啟服務,監聽埠:" + Constant.PORT);
        new Thread(new ServerThread()).start();

        //建立一個socket客戶端,發起請求
        System.out.println("客戶端,請求連線,併發送資料");
        try {
            Socket socket = new Socket(Constant.HOST,Constant.PORT);
            new Thread(new ClientProcessThread(socket)).start();//開啟新的執行緒處理socket連線
        } catch (IOException e) {
            e.printStackTrace();
        }

    }
}

最終的執行結果:
這裡寫圖片描述

三、NIO

1 概念描述

指的是非阻塞式IO通訊模式
針對於BIO的兩個特點,其實也是兩個缺點,Java提供了NIO通訊模式的實現。相對於BIO來說,NIO模式即非阻塞IO。伺服器端儲存一個Socket連線列表,然後對這個列表進行輪詢,如果發現某個Socket埠上有資料可讀時(讀就緒),則呼叫該socket連線的相應讀操作;如果發現某個 Socket埠上有資料可寫時(寫就緒),則呼叫該socket連線的相應寫操作;如果某個埠的Socket連線已經中斷,則呼叫相應的析構方法關閉該埠。這樣能充分利用伺服器資源,效率得到了很大提高。Java中使用Selector、Channel、Buffer來實現上述效果,如下圖所示:
這裡寫圖片描述

執行緒中包含一個Selector物件,他相當於一個通道管理器,可以實現在一個單獨執行緒中處理多個通道的目的,減少執行緒的建立數量。遠端連線對應一個channel,資料的讀寫通過buffer均在同一個channel中完成,並且資料的讀習是非阻塞的。通道建立後需要註冊在selector中,同時需要為該通道註冊感興趣事件(客戶端連線服務端事件、服務端接收客戶端連線事件、讀事件、寫事件),selector執行緒需要採用輪訓的方式呼叫selector的select函式,直到所有註冊通道中有興趣的事件發生,則返回,否則一直阻塞。而後迴圈處理所有就緒的感興趣事件。以上步驟解決BIO的兩個瓶頸:(1)不必對每個連線分別建立執行緒;(2)資料讀寫非阻塞
下面對以下三個概念做一個簡單介紹,Java NIO由以下三個核心部分組成:

-(a)selector
Selector允許單執行緒處理多個Channel。如果你的應用打開了多個連線(通道),但每個連線的流量都很低,使用Selector就會很方便。要使用Selector,得向Selector註冊Channel,然後呼叫他的select方法,這個方法會一直阻塞到某個註冊的通道有事件就緒。一旦這個方法返回,執行緒就可以處理這些事件,事件的例子入有新連線接進來,資料接收等。
-(b)channel與buffer
基本上,所有的IO在NIO中都從一個Channel開始。Channel有點像流。資料可以從channel讀到buffer,也可以從budder寫到channel。
channel和buffer有好幾種類型。下面是Java NIO中的一些主要channel的實現:
FileChannel
DatagramChannel
SocketChannel
ServerSocketChannel
正如你所看到的,這些通道涵蓋了UDP和TCP網路IO,以及檔案IO。
以下是Java NIO裡關鍵的buffer實現:
ByteBuffer
CharBuffer
FloatBuffer
IntBuffer
LongBuffer
ShortBuffer

2 特點

NIO的特點:
(a)一個執行緒可以處理多個通道,減少執行緒建立數量;
(b)讀寫非阻塞,節約資源:沒有可讀/可寫資料時,不會發生阻塞導致執行緒資源的浪費

3 程式碼描述

使用Java程式碼實現NIO的通訊方式
基礎類:

package study20170325;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;

/**
 * Created by apple on 17/3/25.
 */
public class NIOBase {

    // 執行緒中的通道管理器
    public Selector selector;
    public String from,to;//server or client

    public NIOBase(String from,String to){
        this.from = from;
        this.to = to;
    }

    /**
     * 初始化 該執行緒中的通道管理器Selector
     */
    public void initSelector() throws IOException {
        this.selector = Selector.open();
    }


    /**
     * 採用輪詢的方式監聽selector上是否有需要處理的事件,如果有,則迴圈處理
     * 這裡主要監聽連線事件以及讀事件
     */
    public void listen() throws IOException {
        //輪詢訪問select
        while(true){
            //當註冊的事件到達時,方法返回;否則將一直阻塞
            selector.select();
            //獲得selector中選中的項的迭代器,選中的項為註冊的事件
            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            //迴圈處理註冊事件
            /**
             * 一共有四種事件:
             * 1. 服務端接收客戶端連線事件: SelectionKey.OP_ACCEPT
             * 2. 客戶端連線服務端事件:    SelectionKey.OP_CONNECT
             * 3. 讀事件:                SelectionKey.OP_READ
             * 4. 寫事件:                SelectionKey.OP_WRITE
             */
            while(iterator.hasNext()){
                SelectionKey key = iterator.next();
                //手動刪除已選的key,以防重複處理
                iterator.remove();
                //判斷事件性質
                if (key.isAcceptable()){//服務端接收客戶端連線事件
                    accept(key);
                }else if (key.isReadable()){//讀事件
                    read(key);
                }else if (key.isConnectable()) {//客戶端連線事件
                    connect(key);
                }
            }
        }
    }

    /**
     * 當監聽到讀事件後的處理函式
     * @param key 事件key,可以從key中獲取channel,完成事件的處理
     */
    public void read(SelectionKey key) throws IOException {
        //step1. 得到事件發生的通道
        SocketChannel socketChannel = (SocketChannel) key.channel();

        //step2. 建立讀取的緩衝區.將資料讀取到緩衝區中
        ByteBuffer byteBuffer = ByteBuffer.allocate(10);
        int len = socketChannel.read(byteBuffer);
        String msg = "";
        byte[] arr = null;
        while (len > 0){
            byteBuffer.flip();
            arr = new byte[len];
            byteBuffer.get(arr,0,len);
            msg += new String(arr);
            byteBuffer.clear();
            len = socketChannel.read(byteBuffer);
        }
        System.out.println(from + " received data from " + to + ":" + msg);
        //step3. 再將訊息回發給客戶端
        if (from.equals("server"))socketChannel.write(ByteBuffer.wrap(new String(" server send some data back to client").getBytes()));
    }

    /**
     * 當監聽到服務端接收客戶端連線事件後的處理函式
     * @param key 事件key,可以從key中獲取channel,完成事件的處理
     */
    public void accept(SelectionKey key) throws IOException{}

    /**
     * 當監聽到客戶端連線事件後的處理函式
     * @param key 事件key,可以從key中獲取channel,完成事件的處理
     */
    public void connect(SelectionKey key) throws IOException{}
}

服務端執行緒類:

package study20170325;

/**
 * Created by apple on 17/3/25.
 */

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;

/**
 * 採用NIO的方式,開啟服務執行緒
 * 該執行緒存在一個Selector,通道管理器,管理所有的channel
 * step1.服務初始化時,會初始化一個ServerSocektChannel,並註冊到Selector中,註冊"服務端接收客戶端連線"事件
 *
 * step2.之後開啟監聽,輪詢判斷Selector上是否有需要處理的事件,如果有則迴圈處理;
 *
 * step2.1 客戶端連線事件:在處理的過程中,獲取與客戶端連線的通道 socketChannel,並註冊到Selector中,通過該通道,與客戶端進行讀寫操作
 *
 * step2.2 讀事件,利用讀取緩衝區與通道結合進行
 */
public class NIOServerThread extends NIOBase implements Runnable{

    public NIOServerThread(String from, String to) {
        super(from, to);
    }

    //服務端執行緒中的通道管理器,使用它,可以在同一個執行緒中管理多個通道



    @Override
    public void run() {
        try {
            initSelector();//初始化通道管理器Selector
            initServer(Constant.IP,Constant.PORT);//初始化ServerSocketChannel,開啟監聽
            listen();//輪詢處理Selector選中的事件
        } catch (IOException e) {
            e.printStackTrace();
        }

    }


    /**
     * 獲得一個ServerSocket通道,並通過port對其進行初始化
     * @param port    監聽的埠號
     */
    private void initServer(String ip,int port) throws IOException {
        //step1. 獲得一個ServerSocketChannel
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

        //step2. 初始化工作
        serverSocketChannel.configureBlocking(false);//設定通道為非阻塞
        serverSocketChannel.socket().bind(new InetSocketAddress(ip,port));

        //step3. 將該channel註冊到Selector上,併為該通道註冊SelectionKey.OP_ACCEPT事件
        //這樣一來,當有"服務端接收客戶端連線"事件到達時,selector.select()方法會返回,否則將一直阻塞
        serverSocketChannel.register(this.selector,SelectionKey.OP_ACCEPT);
    }


    /**
     * 當監聽到服務端接收客戶端連線事件後的處理函式
     * @param key 事件key,可以從key中獲取channel,完成事件的處理
     */
    @Override
    public void accept(SelectionKey key) throws IOException {

        //step1. 獲取serverSocketChannel
        ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();

        //step2. 獲得和客戶端連線的socketChannel
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);//設定為非阻塞

        //step3. 通過socketChannel給客戶端傳送資訊
        socketChannel.write(ByteBuffer.wrap(new String("server has a connection with client").getBytes()));

        //step4. 註冊該socketChannel
        socketChannel.register(selector,SelectionKey.OP_READ);//為了接收客戶端的訊息,註冊讀事件
    }


}

客戶端執行緒類:

package study20170325;

/**
 * Created by apple on 17/3/25.
 */

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;

/**
 * NIO 客戶端執行緒
 */
public class NIOClientThread extends NIOBase implements Runnable{


    public NIOClientThread(String from, String to) {
        super(from, to);
    }

    @Override
    public void run() {
        try {
            initSelector();//初始化通道管理器
            initClient(Constant.IP,Constant.PORT);//初始化客戶端連線scoketChannel
            listen();//開始輪詢處理事件
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    /**
     * 獲得一個SocketChannel,並對該channel做一些初始化工作,並註冊到
     * @param ip
     * @param port
     */
    private void initClient(String ip,int port) throws IOException {
        //step1. 獲得一個SocketChannel
        SocketChannel socketChannel = SocketChannel.open();

        //step2. 初始化該channel
        socketChannel.configureBlocking(false);//設定通道為非阻塞


        //step3. 客戶端連線伺服器,其實方法執行並沒有實現連線,需要再listen()方法中呼叫channel.finishConnect()方法才能完成連線
        socketChannel.connect(new InetSocketAddress(ip,port));

        //step4. 註冊該channel到selector中,併為該通道註冊SelectionKey.OP_CONNECT事件和SelectionKey.OP_READ事件
        socketChannel.register(this.selector,SelectionKey.OP_CONNECT|SelectionKey.OP_READ);
    }

    /**
     * 當監聽到客戶端連線事件後的處理函式
     * @param key 事件key,可以從key中獲取channel,完成事件的處理
     */
    @Override
    public void connect(SelectionKey key) throws IOException {
        super.connect(key);
        //step1. 獲取事件中的channel
        SocketChannel socketChannel = (SocketChannel) key.channel();

        //step2. 如果正在連線,則完成連線
        if (socketChannel.isConnectionPending()){
            socketChannel.finishConnect();
        }
        socketChannel.configureBlocking(false);//將連線設定為非阻塞
        //step3. 連線後,可以給服務端傳送訊息
        socketChannel.write(ByteBuffer.wrap(new String("client send some data to server").getBytes()));

    }
}

常量類:

package study20170325;

/**
 * Created by apple on 17/3/25.
 */
public class Constant {
    public static final int PORT = 8080;
    public static final String IP = "127.0.0.1";
}

執行主類:

package study20170325;

/**
 * Created by apple on 17/3/25.
 */

public class NIOMain {
    public static void main(String[] args) {

        new Thread(new NIOServerThread("server","client")).start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        new Thread(new NIOClientThread("client","server")).start();

    }
}

執行結果為:
這裡寫圖片描述

四、AIO

非同步非阻塞IO,與NIO的區別可參考 非同步/同步介紹。
可以參考之前對vertx的學習筆記,vertx相當於node在java上的實現,採用的是AIO的模式,即非同步非阻塞IO。