1. 程式人生 > >Java網路程式設計與NIO詳解2:JAVA NIO 一步步構建I/O多路複用的請求模型

Java網路程式設計與NIO詳解2:JAVA NIO 一步步構建I/O多路複用的請求模型

微信公眾號【黃小斜】作者是螞蟻金服 JAVA 工程師,專注於 JAVA 後端技術棧:SpringBoot、SSM全家桶、MySQL、分散式、中介軟體、微服務,同時也懂點投資理財,堅持學習和寫作,相信終身學習的力量!關注公眾號後回覆”架構師“即可領取 Java基礎、進階、專案和架構師等免費學習資料,更有資料庫、分散式、微服務等熱門技術學習視訊,內容豐富,兼顧原理和實踐,另外也將贈送作者原創的Java學習指南、Java程式設計師面試指南等乾貨資源。

當前環境

  1. jdk == 1.8

程式碼地址

git 地址:https://github.com/jasonGeng88/java-network-programming

知識點

  • nio 下 I/O 阻塞與非阻塞實現
  • SocketChannel 介紹
  • I/O 多路複用的原理
  • 事件選擇器與 SocketChannel 的關係
  • 事件監聽型別
  • 位元組緩衝 ByteBuffer 資料結構

場景

接著上一篇中的站點訪問問題,如果我們需要併發訪問10個不同的網站,我們該如何處理?

在上一篇中,我們使用了java.net.socket類來實現了這樣的需求,以一執行緒處理一連線的方式,並配以執行緒池的控制,貌似得到了當前的最優解。可是這裡也存在一個問題,連線處理是同步的,也就是併發數量增大後,大量請求會在佇列中等待,或直接異常丟擲。

為解決這問題,我們發現元凶處在“一執行緒一請求”上,如果一個執行緒能同時處理多個請求,那麼在高併發下效能上會大大改善。這裡就借住 JAVA 中的 nio 技術來實現這一模型。

nio 的阻塞實現

關於什麼是 nio,從字面上理解為 New IO,就是為了彌補原本 I/O 上的不足,而在 JDK 1.4 中引入的一種新的 I/O 實現方式。簡單理解,就是它提供了 I/O 的阻塞與非阻塞的兩種實現方式(當然,預設實現方式是阻塞的。)。

下面,我們先來看下 nio 以阻塞方式是如何處理的。

建立連線

有了上一篇 socket 的經驗,我們的第一步一定也是建立 socket 連線。只不過,這裡不是採用 new socket() 的方式,而是引入了一個新的概念 SocketChannel。它可以看作是 socket 的一個完善類,除了提供 Socket 的相關功能外,還提供了許多其他特性,如後面要講到的向選擇器註冊的功能。

類圖如下: 

建立連線程式碼實現:

// 初始化 socket,建立 socket 與 channel 的繫結關係
SocketChannel socketChannel = SocketChannel.open();
// 初始化遠端連線地址
SocketAddress remote = new InetSocketAddress(this.host, port);
// I/O 處理設定阻塞,這也是預設的方式,可不設定
socketChannel.configureBlocking(true);
// 建立連線
socketChannel.connect(remote);

獲取 socket 連線

因為是同樣是 I/O 阻塞的實現,所以後面的關於 socket 輸入輸出流的處理,和上一篇的基本相同。唯一差別是,這裡需要通過 channel 來獲取 socket 連線。

  • 獲取 socket 連線
Socket socket = socketChannel.socket();
  • 處理輸入輸出流
PrintWriter pw = getWriter(socketChannel.socket());
BufferedReader br = getReader(socketChannel.socket());

完整示例

package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.*;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;

public class NioBlockingHttpClient {

    private SocketChannel socketChannel;
    private String host;

    public static void main(String[] args) throws IOException {

        for (String host: HttpConstant.HOSTS) {

            NioBlockingHttpClient client = new NioBlockingHttpClient(host, HttpConstant.PORT);
            client.request();

        }

    }

    public NioBlockingHttpClient(String host, int port) throws IOException {
        this.host = host;
        socketChannel = SocketChannel.open();
        socketChannel.socket().setSoTimeout(5000);
        SocketAddress remote = new InetSocketAddress(this.host, port);
        this.socketChannel.connect(remote);
    }

    public void request() throws IOException {
        PrintWriter pw = getWriter(socketChannel.socket());
        BufferedReader br = getReader(socketChannel.socket());

        pw.write(HttpUtil.compositeRequest(host));
        pw.flush();
        String msg;
        while ((msg = br.readLine()) != null){
            System.out.println(msg);
        }
    }

    private PrintWriter getWriter(Socket socket) throws IOException {
        OutputStream out = socket.getOutputStream();
        return new PrintWriter(out);
    }

    private BufferedReader getReader(Socket socket) throws IOException {
        InputStream in = socket.getInputStream();
        return new BufferedReader(new InputStreamReader(in));
    }
}

nio 的非阻塞實現

原理分析

nio 的阻塞實現,基本與使用原生的 socket 類似,沒有什麼特別大的差別。

下面我們來看看它真正強大的地方。到目前為止,我們將的都是阻塞 I/O。何為阻塞 I/O,看下圖:

我們主要觀察圖中的前三種 I/O 模型,關於非同步 I/O,一般需要依靠作業系統的支援,這裡不討論。

從圖中可以發現,阻塞過程主要發生在兩個階段上:

  • 第一階段:等待資料就緒;
  • 第二階段:將已就緒的資料從核心緩衝區拷貝到使用者空間;

這裡產生了一個從核心到使用者空間的拷貝,主要是為了系統的效能優化考慮。假設,從網絡卡讀到的資料直接返回給使用者空間,那勢必會造成頻繁的系統中斷,因為從網絡卡讀到的資料不一定是完整的,可能斷斷續續的過來。通過核心緩衝區作為緩衝,等待緩衝區有足夠的資料,或者讀取完結後,進行一次的系統中斷,將資料返回給使用者,這樣就能避免頻繁的中斷產生。

瞭解了 I/O 阻塞的兩個階段,下面我們進入正題。看看一個執行緒是如何實現同時處理多個 I/O 呼叫的。從上圖中的非阻塞 I/O 可以看出,僅僅只有第二階段需要阻塞,第一階段的資料等待過程,我們是不需要關心的。不過該模型是頻繁地去檢查是否就緒,造成了 CPU 無效的處理,反而效果不好。如果有一種類似的好萊塢原則— “不要給我們打電話,我們會打給你” 。這樣一個執行緒可以同時發起多個 I/O 呼叫,並且不需要同步等待資料就緒。在資料就緒完成的時候,會以事件的機制,來通知我們。這樣不就實現了單執行緒同時處理多個 IO 呼叫的問題了嗎?即所說的“I/O 多路複用模型”。


廢話講了一大堆,下面就來實際操刀一下。

建立選擇器

由上面分析可以,我們得有一個選擇器,它能監聽所有的 I/O 操作,並且以事件的方式通知我們哪些 I/O 已經就緒了。

程式碼如下:

import java.nio.channels.Selector;

...

private static Selector selector;
static {
    try {
        selector = Selector.open();
    } catch (IOException e) {
        e.printStackTrace();
    }
}

建立非阻塞 I/O

下面,我們來建立一個非阻塞的 SocketChannel,程式碼與阻塞實現型別,唯一不同是socketChannel.configureBlocking(false)

注意:只有在socketChannel.configureBlocking(false)之後的程式碼,才是非阻塞的,如果socketChannel.connect()在設定非阻塞模式之前,那麼連線操作依舊是阻塞呼叫的。

SocketChannel socketChannel = SocketChannel.open();
SocketAddress remote = new InetSocketAddress(host, port);
// 設定非阻塞模式
socketChannel.configureBlocking(false);
socketChannel.connect(remote);

建立選擇器與 socket 的關聯

選擇器與 socket 都建立好了,下一步就是將兩者進行關聯,好讓選擇器和監聽到 Socket 的變化。這裡採用了以 SocketChannel 主動註冊到選擇器的方式進行關聯繫結,這也就解釋了,為什麼不直接new Socket(),而是以SocketChannel的方式來建立 socket。

程式碼如下:

socketChannel.register(selector,
                        SelectionKey.OP_CONNECT
                        | SelectionKey.OP_READ
                        | SelectionKey.OP_WRITE);

上面程式碼,我們將 socketChannel 註冊到了選擇器中,並且對它的連線、可讀、可寫事件進行了監聽。

具體的事件監聽型別如下:

操作型別 描述 所屬物件
OP_READ 1 << 0 讀操作 SocketChannel
OP_WRITE 1 << 2 寫操作 SocketChannel
OP_CONNECT 1 << 3 連線socket操作 SocketChannel
OP_ACCEPT 1 << 4 接受socket操作 ServerSocketChannel

選擇器監聽 socket 變化

現在,選擇器已經與我們關心的 socket 進行了關聯。下面就是感知事件的變化,然後呼叫相應的處理機制。

這裡與 Linux 下的 selector 有點不同,nio 下的 selecotr 不會去遍歷所有關聯的 socket。我們在註冊時設定了我們關心的事件型別,每次從選擇器中獲取的,只會是那些符合事件型別,並且完成就緒操作的 socket,減少了大量無效的遍歷操作。

public void select() throws IOException {
    // 獲取就緒的 socket 個數
    while (selector.select() > 0){

        // 獲取符合的 socket 在選擇器中對應的事件控制代碼 key
        Set keys = selector.selectedKeys();

        // 遍歷所有的key
        Iterator it = keys.iterator();
        while (it.hasNext()){

            // 獲取對應的 key,並從已選擇的集合中移除
            SelectionKey key = (SelectionKey)it.next();
            it.remove();

            if (key.isConnectable()){
                // 進行連線操作
                connect(key);
            }
            else if (key.isWritable()){
                // 進行寫操作
                write(key);
            }
            else if (key.isReadable()){
                // 進行讀操作
                receive(key);
            }
        }
    }
}

注意:這裡的selector.select()是同步阻塞的,等待有事件發生後,才會被喚醒。這就防止了 CPU 空轉的產生。當然,我們也可以給它設定超時時間,selector.select(long timeout)來結束阻塞過程。

處理連線就緒事件

下面,我們分別來看下,一個 socket 是如何來處理連線、寫入資料和讀取資料的(這些操作都是阻塞的過程,只是我們將等待就緒的過程變成了非阻塞的了)。

處理連線程式碼:

// SelectionKey 代表 SocketChannel 在選擇器中註冊的事件控制代碼
private void connect(SelectionKey key) throws IOException {
    // 獲取事件控制代碼對應的 SocketChannel
    SocketChannel channel = (SocketChannel) key.channel();

   // 真正的完成 socket 連線
    channel.finishConnect();

   // 列印連線資訊
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();
    int port = remote.getPort();
    System.out.println(String.format("訪問地址: %s:%s 連線成功!", host, port));
}

處理寫入就緒事件

// 字符集處理類
private Charset charset = Charset.forName("utf8");

private void write(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
    String host = remote.getHostName();

    // 獲取 HTTP 請求,同上一篇
    String request = HttpUtil.compositeRequest(host);

    // 向 SocketChannel 寫入事件 
    channel.write(charset.encode(request));

    // 修改 SocketChannel 所關心的事件
    key.interestOps(SelectionKey.OP_READ);
}

這裡有兩個地方需要注意:

  • 第一個是使用 channel.write(charset.encode(request)); 進行資料寫入。有人會說,為什麼不能像上面同步阻塞那樣,通過PrintWriter包裝類進行操作。因為PrintWriter的 write() 方法是阻塞的,也就是說要等資料真正從 socket 傳送出去後才返回。

這與我們這裡所講的阻塞是不一致的,這裡的操作雖然也是阻塞的,但它發生的過程是在資料從使用者空間到核心緩衝區拷貝過程。至於系統將緩衝區的資料通過 socket 傳送出去,這不在阻塞範圍內。也解釋了為什麼要用 Charset 對寫入內容進行編碼了,因為緩衝區接收的格式是ByteBuffer

  • 第二,選擇器用來監聽事件變化的兩個引數是 interestOps 與 readyOps

    • interestOps:表示 SocketChannel 所關心的事件型別,也就是告訴選擇器,當有這幾種事件發生時,才來通知我。這裡通過key.interestOps(SelectionKey.OP_READ);告訴選擇器,之後我只關心“讀就緒”事件,其他的不用通知我了。

    • readyOps:表示 SocketChannel 當前就緒的事件型別。以key.isReadable()為例,判斷依據就是:return (readyOps() & OP_READ) != 0;

處理讀取就緒事件

private void receive(SelectionKey key) throws IOException {
    SocketChannel channel = (SocketChannel) key.channel();
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    channel.read(buffer);
    buffer.flip();
    String receiveData = charset.decode(buffer).toString();

    // 當再沒有資料可讀時,取消在選擇器中的關聯,並關閉 socket 連線
    if ("".equals(receiveData)) {
        key.cancel();
        channel.close();
        return;
    }

    System.out.println(receiveData);
}

這裡的處理基本與寫入一致,唯一要注意的是,這裡我們需要自行處理去緩衝區讀取資料的操作。首先會分配一個固定大小的緩衝區,然後從核心緩衝區中,拷貝資料至我們剛分配固定緩衝區上。這裡存在兩種情況:

  • 我們分配的緩衝區過大,那多餘的部分以0補充(初始化時,其實會自動補0)。
  • 我們分配的緩衝去過小,因為選擇器會不停的遍歷。只要 SocketChannel 處理讀就緒狀態,那下一次會繼續讀取。當然,分配過小,會增加遍歷次數。

最後,將一下 ByteBuffer 的結構,它主要有 position, limit,capacity 以及 mark 屬性。以 buffer.flip(); 為例,講下各屬性的作用(mark 主要是用來標記之前 position 的位置,是在當前 postion 無法滿足的情況下使用的,這裡不作討論)。

從圖中看出,

  • 容量(capacity):表示緩衝區可以儲存的資料容量;
  • 極限(limit):表示緩衝區的當前終點,即寫入、讀取都不可超過該重點;
  • 位置(position):表示緩衝區下一個讀寫單元的位置;

完整程式碼

package com.jason.network.mode.nio;

import com.jason.network.constant.HttpConstant;
import com.jason.network.util.HttpUtil;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NioNonBlockingHttpClient {

    private static Selector selector;
    private Charset charset = Charset.forName("utf8");

    static {
        try {
            selector = Selector.open();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws IOException {

        NioNonBlockingHttpClient client = new NioNonBlockingHttpClient();

        for (String host: HttpConstant.HOSTS) {

            client.request(host, HttpConstant.PORT);

        }

        client.select();

    }

    public void request(String host, int port) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.socket().setSoTimeout(5000);
        SocketAddress remote = new InetSocketAddress(host, port);
        socketChannel.configureBlocking(false);
        socketChannel.connect(remote);
        socketChannel.register(selector,
                        SelectionKey.OP_CONNECT
                        | SelectionKey.OP_READ
                        | SelectionKey.OP_WRITE);
    }

    public void select() throws IOException {
        while (selector.select(500) > 0){
            Set keys = selector.selectedKeys();

            Iterator it = keys.iterator();

            while (it.hasNext()){

                SelectionKey key = (SelectionKey)it.next();
                it.remove();

                if (key.isConnectable()){
                    connect(key);
                }
                else if (key.isWritable()){
                    write(key);
                }
                else if (key.isReadable()){
                    receive(key);
                }
            }
        }
    }

    private void connect(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        channel.finishConnect();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();
        int port = remote.getPort();
        System.out.println(String.format("訪問地址: %s:%s 連線成功!", host, port));
    }

    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        InetSocketAddress remote = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
        String host = remote.getHostName();

        String request = HttpUtil.compositeRequest(host);
        System.out.println(request);

        channel.write(charset.encode(request));
        key.interestOps(SelectionKey.OP_READ);
    }

    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        channel.read(buffer);
        buffer.flip();
        String receiveData = charset.decode(buffer).toString();

        if ("".equals(receiveData)) {
            key.cancel();
            channel.close();
            return;
        }

        System.out.println(receiveData);
    }
}

示例效果

總結

本文從 nio 的阻塞方式講起,介紹了阻塞 I/O 與非阻塞 I/O 的區別,以及在 nio 下是如何一步步構建一個 IO 多路複用的模型的客戶端。文中需要理解的內容比較多,如果有理解錯誤的地方,歡迎指正~

補充1:基於NIO的多路複用客戶端(執行緒池版)

public static void main(String[] args) {
    基於執行緒池的偽非同步NIO模型 a = new 基於執行緒池的偽非同步NIO模型();
  a.startServer(); }
private Charset charset = Charset.forName("utf8");   class WriteThread implements Runnable {
    private SelectionKey key;
 public WriteThread(SelectionKey key) {
        this.key = key;
  }
    @Override
  public void run() {
        SocketChannel socketChannel = (SocketChannel) key.channel();
  Socket socket = socketChannel.socket();
 try {
            socketChannel.finishConnect();
  } catch (IOException e) {
            e.printStackTrace();
  }
        InetSocketAddress remote = (InetSocketAddress) socketChannel.socket().getRemoteSocketAddress();
  String host = remote.getHostName();
 int port = remote.getPort();
  System._out_.println(String.format("訪問地址: %s:%s 連線成功!", host, port));    }
}
class ReadThread implements Runnable {
    private SelectionKey key;
 public ReadThread(SelectionKey key) {
        this.key = key;
  }
    @Override
  public void run() {
        SocketChannel socketChannel = (SocketChannel) key.channel();
  ByteBuffer buffer = ByteBuffer.allocate(1024);
 try {
            socketChannel.read(buffer);
  } catch (IOException e) {
            e.printStackTrace();
  }
        buffer.flip();
  String receiveData = null;
 try {
            receiveData = new String(buffer.array(), "utf8");
  } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
  }

        if ("".equals(receiveData)) {
            key.cancel();
 try {
                socketChannel.close();
  } catch (IOException e) {
                e.printStackTrace();
  }
            return;
  }

        System._out_.println(receiveData);
  }
}
class ConnectThread implements Runnable {
    private SelectionKey key;
 public ConnectThread(SelectionKey key) {
        this.key = key;
  }
    @Override
  public void run() {
        SocketChannel socketChannel = (SocketChannel) key.channel();
  ByteBuffer byteBuffer = charset.encode("hello world");
 try {
            socketChannel.write(byteBuffer);
  System._out_.println("hello world");
  } catch (IOException e) {
            e.printStackTrace();
  }
        key.interestOps(SelectionKey._OP_READ_);
  }
}
public void startServer() {
    ExecutorService executorService = Executors.newFixedThreadPool(10);
 try {
        SocketChannel socketChannel = SocketChannel.open();
  Selector selector = Selector.open();    socketChannel.configureBlocking(false);
  InetSocketAddress inetAddress = new InetSocketAddress(1234);    socketChannel.connect(inetAddress);
  socketChannel.register(selector, SelectionKey._OP_CONNECT_ |
                SelectionKey._OP_READ_ |
                SelectionKey._OP_WRITE_);   while (selector.select(500) > 0) {
            Iterator

補充2:基於NIO的多路複用服務端

class NioNonBlockingHttpServer {

    private static Selector _selector_;
 private Charset charset = Charset.forName("utf8");   static {
        try {
            _selector_ = Selector.open();
  } catch (IOException e) {
            e.printStackTrace();
  }
    }

    public static void main(String[] args) throws IOException {

        NioNonBlockingHttpServer httpServer = new NioNonBlockingHttpServer();
  httpServer.select();    }

    public void request(int port) throws IOException {
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
  serverSocketChannel.socket().setSoTimeout(5000);
  serverSocketChannel.configureBlocking(false);
  serverSocketChannel.socket().bind(new InetSocketAddress(8383)); //        serverSocketChannel.register(selector, //                SelectionKey.OP_CONNECT //                        | SelectionKey.OP_READ //                        | SelectionKey.OP_WRITE);
  }

    public void select() throws IOException {
        while (_selector_.select(500) > 0) {
            Set keys = _selector_.selectedKeys();    Iterator it = keys.iterator();   while (it.hasNext()) {

                SelectionKey key = (SelectionKey) it.next();
  it.remove();   if (key.isAcceptable()) {
                    accept(key);
  } else if (key.isWritable()) {
                    write(key);
  } else if (key.isReadable()) {
                    receive(key);
  }
            }
        }
    }

    private void accept(SelectionKey key) throws IOException {
        SocketChannel socketChannel;
  ServerSocketChannel channel = (ServerSocketChannel) key.channel();
  socketChannel = channel.accept();//接受連線請求
  socketChannel.configureBlocking(false);    socketChannel.register(_selector_, SelectionKey._OP_READ_ | SelectionKey._OP_WRITE_);    InetSocketAddress local = (InetSocketAddress) channel.socket().getLocalSocketAddress();
  String host = local.getHostName();
 int port = local.getPort();
  System._out_.println(String.format("請求地址: %s:%s 接收成功!", host, port));      }

    private void write(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();    InetSocketAddress local = (InetSocketAddress) channel.socket().getRemoteSocketAddress();
  String host = local.getHostName();
  String msg = "hello Client";
  channel.write(charset.encode(msg));    System._out_.println(msg);
  key.interestOps(SelectionKey._OP_READ_);
  }

    private void receive(SelectionKey key) throws IOException {
        SocketChannel channel = (SocketChannel) key.channel();
  ByteBuffer buffer = ByteBuffer.allocate(1024);
  channel.read(buffer);
  buffer.flip();
  String receiveData = charset.decode(buffer).toString();   if ("".equals(receiveData)) {
            key.cancel();
  channel.close();
 return;  }

        System._out_.println(receiveData);
  }