1. 程式人生 > >Java NIO | 流程與示例程式碼

Java NIO | 流程與示例程式碼

部落格引用處(以下內容在原有部落格基礎上進行補充或更改,謝謝這些大牛的部落格指導):
java NIO示例以及流程詳解
Java NIO深入理解ServerSocketChannel
併發程式設計網
Java NIO之Selector(選擇器)

NIO執行流程:

第一步:啟動server伺服器,初始化多路複用器selector、ServerSocketChannel通道、設定通道的模式為非阻塞、註冊channel到selector上,並監聽accept請求;

第二步:啟動server伺服器,迴圈selectionKeys,當有channel準備好時就處理,否則一直迴圈或者用間隔輪詢的方式,比如阻塞1S後再獲取selectionKeys;

第三步:啟動client端,初始化多路複用器selector、SocketChannel通道,設定通道的模式為非阻塞(這是連線準備就緒,還未連線就緒/連線成功);

第四步:client首先嚐試連線server,此時socketChannel.connect(new InetSocketAddress(this.host, this.port)返回false,表示server還沒有返回資訊,server收到連線請求後,監聽到client的接入請求,會初始化一個client通道物件在服務端、並將新接入的client註冊到多路複用器Selector上,並應答client;再回到client端,由於client沒有及時收到server端的應答(因為是非阻塞式的),所以client會設定監聽一個connect請求,socketChannel.register(selector, SelectionKey.OP_CONNECT),當server返回應答資訊時,client會收到一個connect請求,key.isConnectable(),如果此時sc.finishConnect()連線完成,client會監聽一個read請求,並像server傳送資料doWrite(sc),然後server會收到一個read請求,key.isReadable()處理完後返回給client,client也會收到一個讀請求,收到server的返回資料,此時,整個互動過程結束;

在這裡插入圖片描述

服務端:

package nio;
 
import org.springframework.util.StringUtils;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;
 
public class ServerSocketChannels implements Runnable {
 
    private  ServerSocketChannel serverSocketChannel;
 
    private  Selector selector;
 
    private volatile boolean stop;
 
 
    public ServerSocketChannels(int port){
 
        try {
            //建立多路複用器selector,工廠方法
            selector = Selector.open();
            //建立ServerSocketChannel,工廠方法
            serverSocketChannel = ServerSocketChannel.open();
            //繫結ip和埠號,預設的IP=127.0.0.1,對連線的請求最大佇列長度設定為backlog=1024,如果佇列滿時收到連線請求,則拒絕連線
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            //設定非阻塞方式
            serverSocketChannel.configureBlocking(false);
            //註冊serverSocketChannel到selector多路服用器上面,監聽accrpt請求
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("the time is start port = " + port);
        } catch (IOException e) {
            e.printStackTrace();
            //用於非正常退出JVM虛擬機器,停掉所有運作 0代表正常退出,非零代表非正常退出
            System.exit(1);
        }
    }
 
    public void stop(){
        this.stop = true;
    }
 
    @Override
    public void run() {
        //如果server沒有停止
        while(!stop){
            try {
                //selector.select()會一直阻塞到有一個通道在你註冊的事件上就緒了
                //selector.select(1000)會阻塞到1s後然後接著執行,相當於1s輪詢檢查
                selector.select(1000);
                //找到所有準備接續的key
                Set<SelectionKey> selectionKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectionKeys.iterator();
                SelectionKey key = null;
               while(it.hasNext()){
                   key = it.next();
                   it.remove();
                   try {
                       //處理準備就緒的key
                       handle(key);
                   }catch (Exception e){
                       if(key != null){
                           //請求取消此鍵的通道到其選擇器的註冊
                           key.cancel();
                           //關閉這個通道
                           if(key.channel() != null){
                               key.channel().close();
                           }
                       }
                   }
               }
            } catch (Throwable e) {
                e.printStackTrace();
            }
        }
        if(selector != null){
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
 
    public void handle(SelectionKey key) throws IOException {
         //如果key是有效的
          if(key.isValid()){
              //監聽到有新客戶端的接入請求
              //完成TCP的三次握手,建立物理鏈路層
              if(key.isAcceptable()){
                  ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                  SocketChannel sc = (SocketChannel) ssc.accept();
                  //設定客戶端鏈路為非阻塞模式
                  sc.configureBlocking(false);
                  //將新接入的客戶端註冊到多路複用器Selector上
                  sc.register(selector, SelectionKey.OP_READ);
              }
              //監聽到客戶端的讀請求
              if(key.isReadable()){
                  //獲得通道物件
                  SocketChannel sc = (SocketChannel) key.channel();
                  ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                  //從channel讀資料到緩衝區
                 int readBytes = sc.read(readBuffer);
                 if (readBytes > 0){
                    //Flips this buffer.  The limit is set to the current position and then
                     // the position is set to zero,就是表示要從起始位置開始讀取資料
                     readBuffer.flip();
                     //eturns the number of elements between the current position and the  limit.
                     // 要讀取的位元組長度
                     byte[] bytes = new byte[readBuffer.remaining()];
                     //將緩衝區的資料讀到bytes陣列
                     readBuffer.get(bytes);
                     String body = new String(bytes, "UTF-8");
                     System.out.println("the time server receive order: " + body);
                     String currenttime = "query time order".equals(body) ? new Date(System.currentTimeMillis()).toString(): "bad order";
                     doWrite(sc, currenttime);
                 }else if(readBytes < 0){
                    key.channel();
                    sc.close();
                  }
              }
          }
    }
 
    public static void doWrite(SocketChannel channel, String response) throws IOException {
        if(!StringUtils.isEmpty(response)){
            byte []  bytes = response.getBytes();
            //分配一個bytes的length長度的ByteBuffer
            ByteBuffer  write = ByteBuffer.allocate(bytes.length);
            //將返回資料寫入緩衝區
            write.put(bytes);
            write.flip();
            //將緩衝資料寫入渠道,返回給客戶端
            channel.write(write);
        }
    }
}
 

服務端啟動程式:

package nio;
 
/**
 * 服務端啟動程式
 */
public class ServerMain {
 
    public static void main(String[] args) {
        int port = 8010;
        ServerSocketChannels server = new ServerSocketChannels(port);
        new Thread(server,"timeserver-001").start();
    }
}

客戶端程式:

package nio;
 
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
 
public class TimeClientHandler implements Runnable {
 
    //伺服器端的ip
    private String host;
   //伺服器端的埠號
    private int port;
   //多路服用選擇器
    private Selector selector;
 
    private SocketChannel socketChannel;
 
    private volatile boolean stop;
 
 
    public TimeClientHandler(String host, int port){
        this.host = host == null ? "127.0.0.1": host;
        this.port = port;
        try {
            //初始化一個Selector,工廠方法
            selector = Selector.open();
            //初始化一個SocketChannel,工廠方法
            socketChannel = SocketChannel.open();
            //設定非阻塞模式
            socketChannel.configureBlocking(false);
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
 
    }
 
 
    /**
     * 首先嚐試連線服務端
     * @throws IOException
     */
    public void doConnect() throws IOException {
        //如果連線成功,像多路複用器selector監聽讀請求
        if(socketChannel.connect(new InetSocketAddress(this.host, this.port))){
         socketChannel.register(selector, SelectionKey.OP_READ);
         //執行寫操作,像伺服器端傳送資料
         doWrite(socketChannel);
        }else {
            //監聽連線請求
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
    }
 
 
    public static void doWrite(SocketChannel sc) throws IOException {
      //構造請求訊息體
       byte [] bytes = "query time order".getBytes();
      //構造ByteBuffer
       ByteBuffer write = ByteBuffer.allocate(bytes.length);
      //將訊息體寫入傳送緩衝區
        write.put(bytes);
        write.flip();
       //呼叫channel的傳送方法非同步傳送
        sc.write(write);
       //通過hasRemaining方法對傳送結果進行判斷,如果訊息全部發送成功,則返回true
        if(!write.hasRemaining()){
            System.out.println("send order 2 server successd");
        }
    }
 
    @Override
    public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
            System.exit(1);
        }
        while (!stop){
            try {
                selector.select(1000);
                Set<SelectionKey> keys =  selector.selectedKeys();
                Iterator<SelectionKey> its =keys.iterator();
                SelectionKey key = null;
                while (its.hasNext()){
                    key = its.next();
                    its.remove();
                    try {
                        handle(key);
                    }catch (Exception e){
                        if(key != null){
                            key.cancel();
                            if(key.channel() != null){
                                key.channel().close();
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    }
 
    public  void handle(SelectionKey key) throws IOException {
        if(key.isValid()){
            SocketChannel sc = (SocketChannel) key.channel();
            if(key.isConnectable()){
                //如果連線成功,監聽讀請求
               if(sc.finishConnect()){
                  sc.register(this.selector, SelectionKey.OP_READ);
                  //像服務端傳送資料
                   doWrite(sc);
               }else{
                   System.exit(1);
               }
            }
            //監聽到讀請求,從伺服器端接受資料
            if(key.isReadable()){
                ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                int readBytes = sc.read(byteBuffer);
                if(readBytes > 0){
                    byteBuffer.flip();
                    byte []  bytes = new byte[byteBuffer.remaining()];
                    byteBuffer.get(bytes);
                    String body = new String(bytes,"UTF-8");
                    System.out.println("now body is "+ body);
                    stop = true;
 
                }else if(readBytes < 0){
                    key.cancel();
                    sc.close();
                }
 
 
            }
        }
    }
  //釋放所有與該多路複用器selector關聯的資源
     if(selector != null){
        try {
            selector.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

客戶端啟動程式:

package nio;
 
 
/**
 * 客戶端啟動程式
 */
public class ClientMain {
 
    public static void main(String[] args) {
      int port = 8010;
      TimeClientHandler client = new TimeClientHandler("",port);
      new Thread(client,"client-001").start();
    }
}


解釋說明:

  • java.nio.Buffer flip()方法:將快取位元組陣列的指標設定為陣列的開始序列即陣列下標0。這樣就可以從buffer開頭,對該buffer進行遍歷(讀取)了。
    也就是說呼叫flip之後,讀寫指標指到快取頭部,並且設定了最多隻能讀出之前寫入的資料長度(而不是整個快取的容量大小)。詳細:https://www.cnblogs.com/woshijpf/articles/3723364.html
  • backlog引數:
    TCP建立連線是要進行三次握手,但是否完成三次握手後,伺服器就處理(accept)呢?
    backlog其實是一個連線佇列,在Linux核心2.2之前,backlog大小包括半連線狀態和全連線狀態兩種佇列大小。
    半連線狀態為:伺服器處於Listen狀態時收到客戶端SYN報文時放入半連線佇列中,即SYN queue(伺服器埠狀態為:SYN_RCVD)。
    全連線狀態為:TCP的連線狀態從伺服器(SYN+ACK)響應客戶端後,到客戶端的ACK報文到達伺服器之前,則一直保留在半連線狀態中;當伺服器接收到客戶端的ACK報文後,該條目將從半連線佇列搬到全連線佇列尾部,即 accept queue (伺服器埠狀態為:ESTABLISHED)。
    在Linux核心2.2之後,分離為兩個backlog來分別限制半連線(SYN_RCVD狀態)佇列大小和全連線(ESTABLISHED狀態)佇列大小。 詳細:https://www.cnblogs.com/Orgliny/p/5780796.html