1. 程式人生 > >Java NIO用法詳解

Java NIO用法詳解

election 線程 tro conn .com 一次 關註 運行 bind

原文: https://my.oschina.net/zhangxufeng/blog/3048735

對於Java NIO,其主要由三個組件組成:Channel、Selector和Buffer。關於這三個組件的作用主要如下:

Channel是客戶端連接的一個抽象,當每個客戶端連接到服務器時,服務器都會為其生成一個Channel對象;
Selector則是Java NIO實現高性能的關鍵,其本質上使用了IO多路復用的原理,通過一個線程不斷的監聽多個Channel連接來實現多所有這些Channel事件進行處理,這樣的優點在於只需要一個線程就可以處理大量的客戶端連接,當有客戶端事件到達時,再將其分發出去交由其它線程處理;

Buffer從字面上講是一個緩存,本質上其是一個字節數組,通過Buffer,可以從Channel上讀取數據,然後交由下層的處理器進行處理。這裏的Buffer的優點在於其封裝了一套非常簡單的用於讀取和寫入數據Api。
關於Channel和Selector的整體結構,可以通過下圖進行的理解,這也是IO多路復用的原理圖:技術分享圖片

可以看到,對於每個Channel對象,其只要註冊到Selector上,那麽Selector上監聽的線程就會監聽這個Channel的事件,當任何一個Channel有對應的事件到達時,Selector就會將該事件分發到下層的應用進行處理。

本文首先會對Channel,Selector和Buffer的主要Api進行講解,然後會結合一個服務器與客戶端的例子來具體講解它們三者的使用方式。

  1. 核心Api
    1.1 Channel
    對於Channel,其主要的api如下:

// 服務器端:
// 用於創建一個供服務器使用的ServerSocketChannel實例
ServerSocketChannel.open();
// 綁定一個服務器端口,從而提供對外的服務
ServerSocketChannel.bind();
// 獲取一個客戶端的Channel連接
ServerSocketChannel.accept();

// 客戶端:
// 用於創建一個供客戶端使用的SocketChannel實例
SocketChannel.open();
// 連接參數中指定地址和端口對應的服務器
SocketChannel.connect();

// ServerSocketChannel和SocketChannel兩者兼備的方法
// 用於指定服務器處理請求的方式是阻塞的還是非阻塞的,對於Java NIO都是以非阻塞的方式進行處理的
Channel.configureBlocking();
// 將當前channel註冊到一個Selector上,該方法會返回註冊之後得到的SelectionKey對象。
// 這裏在註冊Channel的時候可以選擇Selector將關註該Channel的哪些事件,可選的有如下幾種:
// SelectionKey.OP_CONNECT:監聽Channel建立連接事件
// SelectionKey.OP_READ:監聽Channel的可讀取事件,也即客戶端已經發送數據過來,此時可以讀取
// SelectionKey.OP_WRITE:監聽Channel的可寫事件,即當前可以寫入數據到Channel中
Channel.register(Selector, int);
1.2 Selector
對於Selector,其主要的api如下:

// 創建一個Selector實例
Selector.open();
// 監聽所有註冊的Channel,一直阻塞知道有任何一個客戶端Channel有相應的事件到達,
// 需要註意的是,這裏的select()方法返回的是當前接收到是事件數目,而不是具體的事件,
// 具體的事件要通過selectedKeys()方法獲取
Selector.select();
// 獲取當前所有有事件到達的客戶端Channel對應的SelectionKey實例
Selector.selectedKeys();
1.3 SelectionKey
// 判斷當前收到的Channel的事件是否為OP_CONNECT事件
SelectionKey.isConnectable();
// 判斷當前收到的Channel的事件是否為OP_READ事件
SelectionKey.isReadable();
// 判斷當前收到的Channel的事件是否為OP_WRITE事件
SelectionKey.isWritable();
// 返回當前SelectionKey中所封裝的Channel對象
SelectionKey.channel();
從上面的API中可以看出,這裏關於Channel處理的大致流程是,首先由SocketChannel或者ServerSocketChannel調用open()方法創建一個Channel對象;然後調用Channel.register()方法將當前Channel註冊到Selector中;接著通過Selector.select()方法監聽所有註冊的Channel的連接,如果有任何一個有事件到達,此時這些事件會封裝到當前客戶端Channel對應的SelectionKey中,最後通過SelectionKey判斷具體是什麽類型的事件,然後對這些事件進行處理。

  1. 用法示例
    2.1 服務器
    服務器端使用的是ServerSocketChannel,這裏主要是通過監聽客戶端Channel,獲取數據進行打印,然後返回一段數據給客戶端。如下是具體的示例:

public class Server {

public static void main(String[] args) throws IOException {
new Server().start();
}

private void start() throws IOException {
// 創建一個服務器ServerSocketChannel對象
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
// 將當前服務器綁定到8080端口
serverSocketChannel.bind(new InetSocketAddress(8080));
// 設置當前channel為非阻塞的模式
serverSocketChannel.configureBlocking(false);

// 創建一個Selector對象
Selector selector = Selector.open();
// 將服務器ServerSocketChannel註冊到Selector上,並且監聽與客戶端建立連接的事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
  // 監聽ServerSocketChannel上的事件,每秒鐘循環一次,
  // 這裏select()方法返回的是當前監聽得到的事件數目,為0表示當前沒有任何事件到達
  if (selector.select(1000) == 0) {
    System.out.println("has no message...");
    continue;
  }

  // 走到這裏說明當前有監聽的事件到達,獲取所有監聽的Channel所對應的SelectionKey對象,
  // 這裏需要註意的是,前面我們已經將ServerSocketChannel註冊到Selector中了,
  // 因而對於ServerSocketChannel,其監聽得到的則是SelectionKey.OP_CONNECT事件。
  // 但是下面的代碼中,我們也會將與客戶端建立的連接Channel註冊到Selector中,
  // 因而這裏Selector中也會存在接收到的SelectionKey.OP_READ和OP_WRITE事件。
  Set<SelectionKey> selectionKeys = selector.selectedKeys();
  // 對監聽到的事件進行遍歷
  Iterator<SelectionKey> iterator = selectionKeys.iterator();
  while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    // 這裏需要註意的是,Selector在為每個有事件到達的Channel建立SelectionKey對象
    // 之後,其並不會將其移除,如果我們不進行移除,那麽下次循環時該事件還會再被處理一次,
    // 因而這裏要調用remove()方法移除該SelectionKey
    iterator.remove();

    // 如果是有新的客戶端Channel連接建立,則處理該事件
    if (key.isAcceptable()) {
      accept(key, selector);
    }

    // 如果客戶端連接中有可讀取的數據,則處理該事件
    if (key.isReadable()) {
      read(key);
    }

    // 如果可往客戶端連接中寫入數據,則處理該事件
    if (key.isValid() && key.isWritable()) {
      write(key);
    }
  }
}

}

private void accept(SelectionKey key, Selector selector) throws IOException {
// 這裏由於只有ServerSocketChannel才會有客戶端連接建立事件,因而這裏可以直接將
// Channel強轉為ServerSocketChannel對象
ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
// 獲取客戶端的連接
SocketChannel socketChannel = serverChannel.accept();
socketChannel.configureBlocking(false);
// 將客戶端連接Channel註冊到Selector中,並且監聽該Channel的OP_READ事件,
// 也即等待客戶端發送數據到服務器端
socketChannel.register(selector, SelectionKey.OP_READ);
}

private void read(SelectionKey key) throws IOException {
// 這裏只有客戶端才會發送數據到服務器,因而可將其強轉為SocketChannel對象
SocketChannel clientChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
// 從客戶端Channel中讀取數據,這裏read()方法返回讀取到的數據長度,
// 如果為-1,則表示客戶端斷開連接了
int len = clientChannel.read(buffer);
if (len == -1) {
clientChannel.close();
return;
}

// 處理客戶端數據
System.out.println("**********server: read message**********");
System.out.println(new String(buffer.array(), 0, len));
// 由於已經讀取了客戶端數據,因而這裏將對該Channel感興趣的事件修改為
// SelectionKey.OP_READ 和OP_WRITE,用於服務器往該Channel中寫入數據
key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);

}

private void write(SelectionKey key) throws IOException {
String message = "message from server";
ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
// 由於上面為客戶端Channel設置了可供寫入數據的事件,因而這裏可以往客戶端Channel寫入數據
SocketChannel clientChannel = (SocketChannel) key.channel();

if (clientChannel.isOpen()) {
  System.out.println("**********server: write message**********");
  System.out.println(message);
  // 往客戶端Channel寫入數據
  clientChannel.write(buffer);
}

// 寫入完成後,監聽客戶端會繼續發送的數據
if (!buffer.hasRemaining()) {
  key.interestOps(SelectionKey.OP_READ);
}

buffer.compact();

}
}
2.2 客戶端
客戶端使用SocketChannel連接服務器,並且會往服務器中寫入數據,然後等待服務器返回數據並且打印出來。如下是客戶端代碼:

public class Client {

public static void main(String[] args) throws IOException {
new Client().start();
}

private void start() throws IOException {
// 創建一個客戶端SocketChannel對象
SocketChannel channel = SocketChannel.open();
// 設置客戶端Channel為非阻塞模式
channel.configureBlocking(false);

// 創建一個供給客戶端使用的Selector對象
Selector selector = Selector.open();
// 註冊客戶端Channel到Selector中,這裏客戶端Channel首先監聽的是OP_CONNECT事件,
// 因為其首先必須與服務器建立連接,然後才能發送和讀取數據
channel.register(selector, SelectionKey.OP_CONNECT);
// 調用客戶端Channel.connect()方法連接服務器,需要註意的是,該方法的調用必須放在
// 上述Channel.register()方法之後,否則在註冊之前客戶端就已經註冊完成,
// 此時Selector就無法收到SelectionKey.OP_CONNECT事件了
channel.connect(new InetSocketAddress("127.0.0.1", 8080));
while (true) {
  // 監聽客戶端Channel的事件,這裏會一直等待,直到有監聽的事件到達。
  // 對於客戶端,首先監聽到的應該是SelectionKey.OP_CONNECT事件,
  // 然後在後續代碼中才會將SelectionKey.OP_READ和WRITE事件註冊
  // 到Selector中
  selector.select();
  Set<SelectionKey> selectionKeys = selector.selectedKeys();
  Iterator<SelectionKey> iterator = selectionKeys.iterator();
  while (iterator.hasNext()) {
    SelectionKey key = iterator.next();
    iterator.remove();

    // 監聽到客戶端Channel的SelectionKey.OP_CONNECT事件,並且處理該事件
    if (key.isConnectable()) {
      connect(key, selector);
    }

    // 監聽到客戶端Channel的SelectionKey.OP_WRITE事件,並且處理該事件
    if (key.isWritable()) {
      write(key, selector);
    }

    // 監聽到客戶端Channel的SelectionKey.OP_READ事件,並且處理該事件
    if (key.isReadable()) {
      read(key);
    }
  }
}

}

private void connect(SelectionKey key, Selector selector) throws IOException {
// 由於是客戶端Channel,因而可以直接強轉為SocketChannel對象
SocketChannel channel = (SocketChannel) key.channel();
channel.finishConnect();
// 連接建立完成後就監聽該Channel的WRITE事件,以供客戶端寫入數據發送到服務器
channel.register(selector, SelectionKey.OP_WRITE);
}

private void write(SelectionKey key, Selector selector) throws IOException {
SocketChannel channel = (SocketChannel) key.channel();
String message = "message from client";
System.out.println("**client: write message**");
System.out.println(message);
// 客戶端寫入數據到服務器Channel中
channel.write(ByteBuffer.wrap(message.getBytes()));
// 數據寫入完成後,客戶端Channel監聽OP_READ事件,以等待服務器發送數據過來
channel.register(selector, SelectionKey.OP_READ);
}

private void read(SelectionKey key) throws IOException {
System.out.println("**client: read message**");
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.wrap(new byte[1024]);
// 接收到客戶端Channel的SelectionKey.OP_READ事件,說明服務器發送數據過來了,
// 此時可以從Channel中讀取數據,並且進行相應的處理
int len = channel.read(buffer);
if (len == -1) {
channel.close();
return;
}

System.out.println(new String(buffer.array(), 0, len));

}
}
2.3 運行結果
服務器:

has no message...
has no message...
has no message...
has no message...
**server: read message**
message from client
**server: write message**
message from server
客戶端:

**client: write message**
message from client
**client: read message**
message from server

  1. 小結
    本文首先講解了Java NIO中三大組件的作用,然後講解了各個組件主要的方法及其註意事項,最後通過一個客戶端和服務器實例詳細講解了Java NIO是如何使用的

Java NIO用法詳解