1. 程式人生 > >【Java TCP/IP Socket程式設計】----NIO----TCP通道

【Java TCP/IP Socket程式設計】----NIO----TCP通道

NIO介紹

基本Java套接字對於小規模系統可以很好執行,涉及同時有上千個客戶端,就會出現問題,其中一客戶一執行緒的方式線上程的建立,維護和切換需要系統開銷較大,而使用執行緒池的方式雖然節省了一定的系統開銷,但是對於連線生存期比較長的協議,執行緒池的大小限制了系統可以同時服務的客戶端總數。隨著執行緒池數量增加,帶來了更多的執行緒處理開銷,而不能提升系統的效能。此外執行緒很難做到以下方面:

1)要保證某些連線優先獲取服務,或想要指定一定的服務順序,執行緒可能就很難做到。

2)不同客戶端對伺服器端的共享資訊(狀態)同步訪問和修改,需要使用鎖機制或者其他互斥機制對依次訪問的狀態進行嚴格的同步。同時需要考慮多執行緒伺服器的正確性和高效性就變得非常困難。

NIO就可以解決上面的問題,並且通過輪詢一組客戶端的方法,來查詢需要服務的客戶端。NIO主要包括兩個部分:java.nio.channels包下面的Selector(選擇器)和Channel(通道),java.nio包下面的Buffer(緩衝區)抽象。

1)Channel類似於流,一個Channel例項代表了一個可輪詢的IO目標,如套接字(或者檔案,裝置),需要將要監控的通道Channel註冊到Selector例項上,然後呼叫選擇器的select()方法,該方法會一直阻塞,直到一個或者多個通道準備好了IO操作(接收,讀,寫)或者等待超時。

2)Buffer是一個有限容量的資料容器,主要用來從Channel中讀取資料或者傳送資料到Channel中,其本質是一個數組,使用Buffer的好處。第一,與讀寫緩衝區資料相關聯的系統開銷暴露了出來,如緩衝區空間不足,但需要存入資料時,需要操作獲取空間,你可以控制發生的時間,如何發生以及是否發生。第二,一些對Java物件的特殊Buffer對映操作能夠直接操作底層平臺的資源,這些操作節省了再不同地址空間中複製資料的開銷。

使用選擇器具體步驟如下:

1.建立一個Selector例項

2.將要檢測的通道註冊到選擇上面,並同時制定通道上感興趣的IO操作。

3.重複執行。

    1)呼叫一種select()方法。

    2)獲取選擇的鍵的列表。

    3)對於已選擇鍵集中的每個鍵。

      a.獲取通道,並從鍵中獲取附件(如果合適的話)

      b.確定準備就緒的操作並執行。如果是accept操作,將接受的通道設定為非阻塞模式,並將其註冊到選擇器上。

      c.如果需要,修改鍵的興趣操作集。

      d.從已經選擇鍵中移除鍵。

NIO注意點

1.在NIO中資料的讀寫操作始終與緩衝區關聯,Channel將資料讀入緩衝區,然後我們從緩衝區訪問資料。寫資料也是將要傳送的資料按順序填入緩衝區。

2.緩衝區提供了Java基本資料型別中除布林型別以外的檢視,資料真正儲存地方是ByteBuffer,其他如CharBuffer只是將ByteBuffer解釋成其他基本型別。

3.Buffer比較常見的方法flip()和hasRemaining(),其中flip()是用於緩衝區寫入模式切換到讀取模式(將緩衝區當前置為0,從頭開始讀取資料),hasRemaining()方法表示的是緩衝區是否還有資料。

4.Channel和Selector配置使用時,必須先將Channel註冊到Selector上面,且要將Channel設定成非阻塞模式。Selector可以對通道,Selector可以監聽通道四種不同的事件:Connect,Accept,Read,Write;

案例

public class TCPEchoClientNonBlocking {
  private static final String SERVER = "127.0.0.1";
  private static final int PORT = 1001;

  public static void main(String[] args) throws IOException {
    byte[] argument = "Hello ServerSocketChannel".getBytes();
    SocketChannel socketChannel = SocketChannel.open();
    socketChannel.configureBlocking(false);
    //判斷是否已經完成連線
    if (!socketChannel.connect(new InetSocketAddress(SERVER, PORT))) {
      while (!socketChannel.finishConnect()) {
        System.out.println(".");
      }
    }
    ByteBuffer writeBuffer = ByteBuffer.wrap(argument);
    ByteBuffer readBuffer = ByteBuffer.allocate(argument.length);
    //接收到的總的位元組數
    int totalBytesRcvd = 0;
    //呼叫一次read()方法返回的位元組數
    int byteRcvd;
    while (totalBytesRcvd < argument.length) {
      if (writeBuffer.hasRemaining()) {
        socketChannel.write(writeBuffer);
      }
      if ((byteRcvd = socketChannel.read(readBuffer)) == -1) {
        throw new SocketException("Connetion closed prematurely");
      }
      totalBytesRcvd += byteRcvd;
      System.out.println(".");
    }
    //列印接收到的位元組數
    System.out.println("Received :" + new String(readBuffer.array(), 0, totalBytesRcvd));
    socketChannel.close();
  }
}
public class TCPServerSelector {
  private static final int BUFSIZE = 256;
  private static final int TIMEOUT = 300;

  public static void main(String[] args) throws IOException {
    String[] ports = "1001,1002,1003,1004".split(",");
    //建立選擇器例項
    Selector selector = Selector.open();
    for (String port : ports) {
      //建立通道,並將通道設定為非阻塞狀態,註冊到選擇器上面
      ServerSocketChannel listenChannel = ServerSocketChannel.open();
      listenChannel.bind(new InetSocketAddress(Integer.parseInt(port)));
      listenChannel.configureBlocking(false);
      listenChannel.register(selector, SelectionKey.OP_ACCEPT);
    }
    TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
    while (true) {
      //等待通道準備好
      if (selector.select(TIMEOUT) == 0) {
        System.out.println(".");
        continue;
      }
      //selectionKeys返回的集合中包含每個準備好某一IO操作的通道SelectionKey(註冊時建立)
      Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
      while (iterator.hasNext()) {
        SelectionKey key = iterator.next();
        //通道感興趣操作是接收操作
        if (key.isAcceptable()) {
          protocol.handleAccept(key);
        }
        //通道感興趣的操作是讀取操作
        if (key.isReadable()) {
          protocol.handleRead(key);
        }
        //通道感興趣的操作是寫入操作(向客戶端)
        if (key.isValid() && key.isWritable()) {
          protocol.handleWrite(key);
        }
        //從鍵集合中移除已經處理後的鍵
        iterator.remove();
      }
    }
  }
}
public interface TCPProtocol {
  void handleAccept(SelectionKey key) throws IOException;

  void handleRead(SelectionKey key) throws IOException;

  void handleWrite(SelectionKey key) throws IOException;
}
public class EchoSelectorProtocol implements TCPProtocol {
  private int bufSize;

  public EchoSelectorProtocol(int bufSize) {
    this.bufSize = bufSize;
  }

  @Override
  //服務端處理客戶端的連線
  public void handleAccept(SelectionKey key) throws IOException {
    SocketChannel clientChannel = ((ServerSocketChannel) key.channel()).accept();
    clientChannel.configureBlocking(false);
    //通道註冊到選擇器上,附件是快取區
    clientChannel.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(bufSize));
  }

  @Override
  //伺服器端從通道中讀取資料
  public void handleRead(SelectionKey key) throws IOException {
    SocketChannel clientChannel = (SocketChannel) key.channel();
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    long bytesRead = clientChannel.read(buffer);
    if (bytesRead == -1) {
      clientChannel.close();
    } else if (bytesRead > 0) {
      key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    }
  }

  @Override
//將資料寫到客戶端中
  public void handleWrite(SelectionKey key) throws IOException {
    ByteBuffer buffer = (ByteBuffer) key.attachment();
    buffer.flip();
    SocketChannel clientChannel = (SocketChannel) key.channel();
    clientChannel.write(buffer);
    if (!buffer.hasRemaining()) {
      key.interestOps(SelectionKey.OP_READ);
    }
    buffer.compact();
  }

}