1. 程式人生 > >Nio程式設計模型總結

Nio程式設計模型總結

終於,這兩天的考試熬過去了, 興致沖沖的來整理筆記來, 這篇部落格是我近幾天的NIO印象筆記彙總,記錄了對Selector及Selector的重要引數的理解,對Channel的理解,常見的Channel,對NIO事件驅動的程式設計模型的理解,NIO與傳統IO的對比,NIO的TCP/IP程式設計的實踐.

Channel

什麼是Channel

這個概念絕對是一級概念,Channel是一個管道,用於連線位元組緩衝區和另一端的實體, 這個位元組緩衝區就是ByteBuffer, 另一端的實體可以是一個File 或者是 Socket ;

或者基於IO的網路程式設計, 資料的互動藉助於InputStream或者是OutputStream, 而Channel可以理解成對Stream的又一層封裝;在這種程式設計模型中 服務端想和客戶端進行互動,就需要從服務端自己的ServerSocketChannel中獲取前來連線的客戶端的SocketChannel,並把他註冊關聯上感性趣的事件且自己的Selector選擇器上, 這樣一旦客戶端把Buffer中的資料推送進channel, 服務端就可以感知,進而處理

常用的Chanenl

  • 檔案通道: FileChannel
  • 套接字通道
    • 服務端: ServerSocketChannel
    • 客戶端: SocketChannel
  • 資料包通道: DataGramSocket

Channel 與 Stream

Channel的NIO程式設計模型中一大元件,它類似IO中的Stream,但是兩者也有本質的區別;

為什麼說是類似呢? 看下面的兩段程式碼, 需求是磁碟上的檔案進行讀寫

在IO程式設計中,我們第一步可能要像下面這樣獲取輸入流,按位元組把磁碟上的資料讀取到程式中,再進行下一步操作

FileInputStream fileInputStream = new FileInputStream("123.txt");

在NIO程式設計中,目標是需要先獲取通道,再基於Channel進行讀寫

FileInputStream fileInputStream = new FileInputStream("123.txt");
FileChannel channel = fileInputStream.channel();

對使用者來說,在IO / NIO 中這兩種都直接關聯這磁碟上的資料檔案,資料的讀寫首先都是獲取Stream和Channel,所以說他們相似;

但是: 對於Stream來說,所有的Stream都是單向的,對我們的程式來說,Stream要麼只能是從裡面獲取資料的輸入流,要麼是往裡面輸入資料的輸出流,因為InputStream和outputStream都是抽象類,在java中是不支援多繼承的, 而通道不同,他是雙向的,對一個通道可讀可寫

怎麼理解 Channel可以是雙向的?

如上圖,凡是同時實現了readable,writeable介面的類,都雙向的通道. 下面是典型的例子

SocketChannel
在NIO網路程式設計中,服務端可以通過ServerSocketChannel獲取客戶端的SocketChannel
這個SocketChannel可以read() 客戶端的訊息存入Buffer, 往客戶端 write()buffer裡的內容
socketChannel1.read(byteBuffer);
socketChannel1.write(byteBuffer);

對於一個channel,我們既能從中獲取資料,也能往外read資料

基於channel的檔案拷貝方式和傳統的IO拷貝的競速

效率最低的按位元組拷貝

public static  void text4() throws IOException {
    System.out.println("開始: ... ");
        FileInputStream    fis = new FileInputStream("123.txt");
        FileOutputStream   fos = new FileOutputStream("output123.txt");
    int read=0;
    long start =0;
    while((read=fis.read())!=-1){
        fos.write(read);
    }
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );
    fis.close();
    fos.close();
}

一個3901KB的檔案的拷貝,在我的機器上跑出了 1561097384707 的好成績; 實屬無奈,擦點以為編譯器卡死


以NIO,channel+buffer的模型,拷貝檔案

try (
    FileInputStream  fis = new FileInputStream("123.txt");
    FileOutputStream   fos = new FileOutputStream("output123.txt");
){
    //1.獲取通道
    FileChannel   inChannel = fis.getChannel();
    FileChannel   outChannel = fos.getChannel();

    //2.分配指定大小的緩衝區
    ByteBuffer buffer = ByteBuffer.allocate(1024);
    long start = System.currentTimeMillis();
    //3.將通道中的資料緩衝區中
    while (inChannel.read(buffer) != -1) {
        buffer.flip();//切換成都資料模式
        //4.將緩衝區中的資料寫入通道中
        outChannel.write(buffer);
        buffer.clear();//清空緩衝區
    }
    System.out.println("總耗時:" + (System.currentTimeMillis() - start));
} catch (Exception e) {
    e.printStackTrace();
}

速度明顯提升 大約平均耗時 110


NIO+零拷貝 複製檔案

  // 直接獲取通道
    FileChannel inChannel2 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel2 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    //記憶體對映檔案
    MappedByteBuffer inMappedBuf = inChannel2.map(FileChannel.MapMode.READ_ONLY, 0, inChannel2.size());
    MappedByteBuffer outMappedBuf = outChannel2.map(FileChannel.MapMode.READ_WRITE, 0, inChannel2.size());
    //直接對緩衝區進行資料讀寫操作
    byte[] dst = new byte[inMappedBuf.limit()];
    long start = System.currentTimeMillis();
    inMappedBuf.get(dst);
    outMappedBuf.put(dst);
    System.out.println("耗費的時間為:" + ( System.currentTimeMillis() - start));

    inChannel2.close();
    outChannel2.close();

或者

/*
     * 通道之間的資料傳輸(直接緩衝區)
     */
    FileChannel inChannel3 = FileChannel.open(Paths.get("123.txt"), StandardOpenOption.READ);
    FileChannel outChannel3 = FileChannel.open(Paths.get("output123.txt"), StandardOpenOption.WRITE, StandardOpenOption.READ, StandardOpenOption.CREATE);
    long start = System.currentTimeMillis();
    inChannel3.transferTo(0, inChannel3.size(), outChannel3);
    System.out.println("耗時: "+(System.currentTimeMillis()-start) );

    //等價於
    // outChannel3.transferFrom(inChannel3, 0, inChannel3.size());

    inChannel3.close();
    outChannel3.close();

零拷貝僅需要耗時 6 就可以完成


NIO的非阻塞與IO的阻塞

什麼是阻塞? 舉個例子, 如果有一天我碰到了不會的作業題,於是我給老師發了條短息請教咋做, 這時,假如我進入了阻塞模式,我就會一直瞅著手機,別的也不幹,就等著老師回信息, 假如我進入了非阻塞的模式,發完簡訊後跳過這個題,去做別的題

常見的阻塞比如, 鍵盤錄入, Socket的accept()以及IO的read write, 全部會卡在那行程式碼直到執行完畢才會往下執行, 這種風格的好處是顯而易見的, 及其容易的進行順序程式設計

但是在NIO中,channel的read,write可以是阻塞的,也可以是非阻塞的,這取決於channel是否阻塞, 一般在進行網路程式設計時,要搭配上selector選擇器,一起用, 同時channel我們也會設定成非阻塞的, 想想也不能讓伺服器的讀寫阻塞住,因為它可不是面對一兩個使用者,我們需要它可以一遍一遍的正常流水執行

在客戶端,connect方法不再是阻塞的,和服務端進行資料互動之前,java提供了檢查機確保連線百分百健康, 如果服務端沒有接受連線,客戶端是是沒辦法進一步操作的

if (selectionKey.isConnectable()) {
// 強轉成 有連線事件發生的Channel
client = (SocketChannel) selectionKey.channel();
// 完成連線
if (client.isConnectionPending()) {
client.finishConnect();

從通道中的read和write方法也不是阻塞的,即可返回,可以讓服務端的業務程式碼很流暢的執行完,再接受新的請求,處理新請求

Selector

Selector選擇器NIO的第三個元件,三者的關係圖如上所示

什麼是selector? 作用是什麼?

selector是選擇器的意思, 和它直接關聯的元件是Channel, 沒錯,它的作用就是不斷的輪詢繫結在他身上的所有channel. 一旦有通道發生了它感興趣的事件,接著處理此事件

selector維護了什麼?

無論是服務端的Selector 還是客戶端的Selector 它都維護了三個Set集合 , 裡面封裝的是 SelectionKey, 他是channel註冊進Selector的產物,一般是使用它反向獲取channel

  1. key set
  • 他是一個全集,每當channel通過register方法註冊進選擇器時,於此同時也會把包含自己資訊的key新增到這個全集中來 註冊的資訊就會以SelectionKey的封裝形式儲存在這個集合中, 選擇器每次輪詢的channel,就是這裡面的channel
  1. selected key
  • 感興趣的key的集合, 舉個例子, 通道1註冊進選擇器時,告訴選擇器,我可能會給你發信息,你得盯著我,讀我給你的資訊, 於是選擇器對通道1感性趣的事件是 read, 那麼在選擇器輪詢channel時, 一旦通道1出現了write操作,就會被選擇器感知,開始read

  • 每次遍歷selected key時我們會執行這行程式碼:Set<SelectionKey> selectionKeys = selector.selectedKeys(); 它的意思是,我們取出了 選擇器的感性事件的set集合,只要程式還在執行,只要選擇器一旦被open(),除非我們手動的close() 否則選擇器物件就不會被釋放,所以它的感興趣的set集合是不會被自動會收到,於是我們就得收到的把處理過的感興趣的事件對應的SelectionKey移除出這個set集合,不然下一次輪詢時,這個事件還會再一次被處理,並且無限制的處理下去

  • key有且僅有兩種方式從 selected-key-set 中剔除 1. 通過Set的remove()方法, 2.通過迭代器的remove()方法

  1. cannelled key
  • 取消的key的集合,代表原來感興趣的事件,現在不感興趣了. 下一次輪詢,進行select() 本集合中的SelectionKey會從key set中移除, 意味著它所關聯的channel將會被選擇器丟棄掉,不再進行監聽
  • 關閉channel 或者是呼叫了cancel()方法都會將key新增到cannelled key 集合中
  • 使用場景: 一般會在客戶端主動斷開連線的時候使用它.

selector的select()方法

select(long); // 設定超時時間

selectNow(); // 立即返回,不阻塞

select(); 阻塞輪詢

select()過程的細節:

  • 第一步, cannelled-key中的每一個元素會從全集key set中剔除,表示這些可以關聯的通道不會被註冊
  • 第二步作業系統幫我們輪詢每一個通道是否有選擇器感性趣的事情發生
    • 對於一條準備就緒的channel(發生事件通道),他至少會發生下面兩件事之一:
      • 它的key會被新增進selected-key-set中,來標識它將被選中,進而處理
      • 如果它的key,已經存在於這個集合中了,下一步就是它的 read-operation將被更新
  • 第三步: 如果在輪詢時發現了有任何key被放置在了cannelled-key-set中,重複第一步,不再註冊它關聯的通道

romove key 和 cannel key 的區別

前者是把key從selected key set集合,也就是被選中的集合中剔除出去,表示當前的事件已經處理完了

後者是表示,把key從全集中剔除出去, 表示想要廢棄這個key關聯的channel

selector的建立

他是根據不同作業系統提供的不同的Provider使用provide()創建出來的

NIO程式設計模型


如上圖, 在NIO網路程式設計模式中,不再是傳統的多執行緒程式設計模型,當有新的客戶端的連線到來,不再重新開闢新的執行緒去跑本次連線,而是統一,一條執行緒處理所有的連線, 而一次連線本質上就是一個Channel, NIO網路程式設計模型是基於事件驅動型的; 即,有了提前約定好的事件發生,接著處理事件,沒有時間發生,選擇器就一直輪詢 下面解釋上圖的流程

  1. 服務端建立代表服務端的Channel,繫結好埠,設定成非阻塞的通道 並且初始化選擇器,然後開始輪詢繫結在自己身上的通道,此時的通道只有一個ServerSocketChannel,而選擇器只關心ServerSocketChannel上發生的OP_ACCEPT事件,而又沒有客戶端來連結 所以他被阻塞在了select()
System.out.println("Server...");
// 獲取服務端的SerSokcetChannel
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// todo 一定要把他配置成 非阻塞的
serverSocketChannel.configureBlocking(false);

// 從通道中獲取 服務端的物件
ServerSocket serverSocket = serverSocketChannel.socket();
serverSocket.bind(new InetSocketAddress(8899));

// 建立選擇器
Selector selector = Selector.open();
// 把通到註冊到 選擇器上
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

  while (true) {
            // 阻塞式等待 channel上有事件發生
            int select = selector.select();
  1. 客戶端 建立代表自己的SocketChannel, 建立選擇器,把自己的註冊在上面,如下程式碼, 初始化自己,SocketChannel, 把客戶端的通道註冊進選擇器,並告訴選擇器SocketChannel的感興趣事件是OP_CONNECT連線事件; 當執行到下面的socketChannel.connect(new InetSocketAddress("localhost", 8899)); 連線的請求就已經發送出去了,也就是說,如果沒有意外,執行完這一行程式碼,服務端的select()方法已經返回了, 但是客戶端的connect()是非阻塞的,立即返回,故在客戶端依然會繼續執行, 進而判斷一下是否是真的連線上了
// 獲取客戶端的通道
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);

Selector selector = Selector.open();
// 把客戶端的通道註冊進選擇器
socketChannel.register(selector, SelectionKey.OP_CONNECT);
// todo 連線客戶端, 執行完這行程式碼後, 服務端就能就收到通知!!!
socketChannel.connect(new InetSocketAddress("localhost", 8899));

while (true) {
    int number = selector.select(); // 選擇器阻塞式的 等待 Channel上發生它關心的事件
    System.out.println(" 發生了感興趣的事件: " + number);
    Set<SelectionKey> keySet = selector.selectedKeys();
// 驗證
    for (SelectionKey selectionKey : keySet) {
        SocketChannel client = null;
if (selectionKey.isConnectable()) {
    // 強轉成 有連線事件發生的Channel
    client = (SocketChannel) selectionKey.channel();
    // 完成連線
    if (client.isConnectionPending()) {
        client.finishConnect();
        ByteBuffer byteBuffer = ByteBuffer.allocate(512);
        byteBuffer.put((LocalDate.now() + "連線成功").getBytes());
        byteBuffer.flip();
        client.write(byteBuffer);
  1. 對於服務端,輪詢了這麼久,終於有連線進來了,於是進一步處理, 判斷如果當前的連線是請求建立連線的話,就去建立連線, 對於服務端來說,建立連線就是然服務端記住客戶端, 客戶端是誰呢?SocketChanel, 怎麼獲取呢? serverSocketChannel1.accept(); 怎麼建立連線呢? 實際上就是把當前的客戶端的channel註冊在服務端的選擇器上,並告訴它自己關心的事件啥, 當然一開始建立連線時, 服務端肯定首先要做的就是監聽客戶端傳送過來的資料,於是 繫結上感興趣的事件是read, 並且不要忘了,每次遍歷感興趣的key的集合時,都要及時的把當前的key剔除
selectionKeys.forEach(selectionKey -> {
    SocketChannel socketChannel = null;
    String sendKey = null;
    try {
        if (selectionKey.isAcceptable()) {
            // 1. 使用者請求建立連線, 根據SelectionKey 獲取服務端的通道
            // todo 當前的這個SelecttionKey 是有 ServerSocketChannel 和 selector 聯絡生成的, 因此我們 強制轉換回 ServerSocketChannel
            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();

            // todo  !!!!!!!  這是重點, 這裡的accept是非阻塞的 !!!!!!!!
            // 根據服務的 通道  獲取到客戶端的通道
            socketChannel = serverSocketChannel1.accept();
            System.out.println("socketChannel.class: " + socketChannel.getClass());
            // todo 配置成非阻塞的
            socketChannel.configureBlocking(false);

            // todo 新獲取的通道 註冊進選擇器
            socketChannel.register(selector, SelectionKey.OP_READ);

            // 儲存客戶端的資訊
            String key = "[ " + UUID.randomUUID().toString() + " ]";
            clientMap.put(key, socketChannel);
            // todo   把 擁有當前事件SelectionKey 剔除
  1. 對於客戶端,如果它想往服務端傳送鍵盤錄入的內容時,獲取鍵盤錄入物件是免不了的事, 但是這物件會阻塞,於是客戶端不得不開啟一條新的執行緒執行讀取鍵盤錄入,讓自己具有鍵盤錄入的功能,同時又不會被阻塞, 如果客戶端想要接受服務端推送回來的資料怎麼辦呢? 於是我們就得告訴客戶端的選擇器,新增一個感興趣的事件,read, 這樣,一旦服務端有資料推送過來的,客戶端的選擇器就會感知到這個事件,並且這個事件的selectionKay是可讀的,這樣一個比較完善的客戶端就ok了
executorService.submit(() -> {
    while (true) {
        try {
            // 清空上面的快取
            byteBuffer.clear();
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));
            String msg = bufferedReader.readLine();
            byteBuffer.put(msg.getBytes());
            byteBuffer.flip();
            finalClient.write(byteBuffer);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
});
}

// 上面的程式碼是發生了 請求連線事件
// todo 給客戶端註冊一個讀取客戶端返回資料的事件
client.register(selector, SelectionKey.OP_READ);
  1. 服務端在建立連線時,就給客戶端的通道綁定了感興趣的事件是read, 於是當客戶端往channel中write資料了,服務端就會來到下面的程式碼塊, 如果是群聊的話, 我們就得知道,往哪些使用者轉發資訊, 於是我們提前構造了map,這個map存放就是一個一個和服務的channel建立連線的SocketChannel; 只需要遍歷map, 往裡面的chanel,write資料即可
 else if (selectionKey.isReadable()) {
    System.out.println("readable...");
    // 獲取客戶端的通道
    socketChannel = (SocketChannel) selectionKey.channel();
    System.out.println("當前的客戶端 通道例項: socketChannel == " + socketChannel);
    // 獲取當前 是哪個客戶端發起的資訊
    ByteBuffer byteBuffer = ByteBuffer.allocate(512);
    // 讀取客戶端傳送的訊息
    while (true) {// todo todo todo  很重要的一點!!!  read方法是非阻塞的, 很可能還有沒讀取到資料就返回了
        int read = socketChannel.read(byteBuffer);
        System.out.println("read == : " + read);
        if (read <= 0) {
            break;
        }
    }
    // 往其他客戶端寫
    byteBuffer.flip();
    Charset charset = Charset.forName("utf-8");
    String msg = String.valueOf(charset.decode(byteBuffer).array());
    // Buffer轉字串
    System.out.println("收到客戶端: " + socketChannel + "  傳送的訊息: " + msg);
    // 遍歷map
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        if (socketChannel == map.getValue()) {
            sendKey = map.getKey();
        }
    }
    // todo 轉發給全部的客戶端傳送
    for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
        SocketChannel socketChannel1 = map.getValue();
        ByteBuffer byteBuffer1 = ByteBuffer.allocate(512);
        // 把資訊放進 byteBuffer1中
        String message = msg + " : " + sendKey;
        byteBuffer1.put(message.getBytes());
        byteBuffer.flip();
        socketChannel1.write(byteBuffer);
    }
  1. 客戶端斷開了怎麼辦呢? 在一臺電腦上,手動將一個客戶端停掉,服務端會執行到selectionKey.isReadable() 並且進入這個if塊, 當它嘗試從裡面讀取的時候,就發現這個連線已經壞掉了,於是報錯,強制斷開連線, 因為還要繼續輪詢,全集key set 中依然儲存著當前的客戶端的channel, 所以會一直報錯下去, 怎麼辦呢? 如下
//  selectionKey.cancel();  常規

try {
    // 這樣也能取消這個鍵
    socketChannel.close();
} catch (IOException e1) {
    e1.printStackTrace();
}

// 當然我們現在還要多一步,  因為他還在我們的map裡面  不然一會發訊息的時候,會出錯
// todo 移除出map 中失效的 channel
// todo 遍歷map
for (Map.Entry<String, SocketChannel> map : clientMap.entrySet()) {
    if (socketChannel == map.getValue()) {
        sendKey = map.getKey();
    }
}
clientMap.remove(sendKey, socketChannel);