NIO學習之Selector,SelectionKey與客戶端與服務端通訊簡單實現(1)
選擇器(Selector)的 作用:將通道感興趣的事件放入佇列中,而不是馬上提交給應用程式,等已註冊的通道自己來請求處理這些事件。換句話說,就是選擇器將會隨時報告已經準備好了的通道,而且是按照先進先出的順序。
open()
方法,靜態方法,用於獲取1個Selector物件
keys()
方法,用於獲取所有註冊到Selector物件上的SelectionKey
selectedKeys()
方法,用於獲取已註冊通道發出的準備好了的SelectionKey
select()
方法,呼叫此方法的執行緒會一直阻塞,直到註冊的通道中關注的事件發生了才會返回。 例:服務端的ServerSocketChannel
OP_ACCEPT
事件,如果沒有客戶端連線服務端,那麼服務端將一直阻塞在呼叫select()
方法的地方
select(long)
方法,此方法與不帶引數的差不多,引數的含義是當阻塞的時間超過給定的時間引數,該方法將返回繼續執行
wakeup()
方法,會喚醒當前被阻塞的執行緒,使其 select() 立即返回;如果當前執行緒沒有阻塞,那麼執行了wakeUp() 方法之後,下一個執行緒的 select() 方法會被立即返回,不再被阻塞下去。
close()
方法,能夠關閉當前的選擇器。當一個執行緒當前呈阻塞狀態,那麼中止這種狀態需要先呼叫Selector的 wakeUp()
方法。close()方法在實現類中先呼叫wakeUp()
選擇鍵(SelectionKey)的作用是表明哪個通道已經做好了相應的準備,相應的準備指SelectionKey
類裡定義的OP_READ
,OP_WRITE
,OP_CONNECT
,OP_ACCEPT
中某個某個事件發生了。Selector不斷輪詢是否有事件準備好了,如果有事件準備好了則獲取事件相應的SelectionKey,進入事件處理
SelectionKey主要說兩點:
attach(Object)
主要用於繫結一個物件(物件可以任意型別attachment()
方法用於獲取繫結的物件
interestOps()
方法用於獲取通道“感興趣”的操作或事件
說了Selector 與 SelectionKey 的基礎知識及主要方法的使用,現在就來實踐一下。
服務端
虛擬碼
1.開啟1個ServerSocketChannel通道
2.把ServerSocketChannel設定為非阻塞的
3.為ServerSocketChannel通道繫結ip地址與埠
4.獲取Selector物件
5.把ServerSocketChannel通道感興趣的事件註冊到Selector物件裡
6.監聽埠,等待已註冊的通道關注的事件觸發
7.獲取事件觸發的通道進行相應的處理
public class NIOServer {
private Selector selector;
public void initServer(int port) throws IOException{
// 開啟ServerSocket通道
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.socket().bind(new InetSocketAddress(port));
// 獲取一個選擇器
this.selector = Selector.open();
// 將通道管理器與該通道進行繫結,併為該通道註冊SelectionKey.OP_ACCEPT事件
// 註冊事件後,當該事件觸發時會使selector.select()返回,
// 否則selector.select()一直阻塞
serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT);
}
public void listen() throws IOException{
System.out.println("啟動伺服器!");
while (true) {
// select()方法一直阻塞直到有註冊的通道準備好了才會返回
selector.select();
Iterator<?> iterator = selector.selectedKeys().iterator();
while (iterator.hasNext()) {
SelectionKey key = (SelectionKey) iterator.next();
// 刪除已選的key,防止重複處理
iterator.remove();
handler(key);
}
}
}
public void handler(SelectionKey key)throws IOException{
if (key.isAcceptable()) {
handlerAccept(key);
}else if (key.isReadable()){
handlerRead(key);
}else if (key.isWritable()){
System.out.println("can write!");
}else if (key.isConnectable()){
System.out.println("is connectable");
}
}
public void handlerAccept(SelectionKey key) throws IOException{
// 從SelectionKey中獲取ServerSocketChannel
ServerSocketChannel server = (ServerSocketChannel) key.channel();
// 獲取SocketChannel
SocketChannel socketChannel = server.accept();
// 設定成非阻塞
socketChannel.configureBlocking(false);
System.out.println("與客戶端建立連線");
// 為socketChannel通道建立 OP_READ 讀操作,使客戶端傳送的內容可以被讀到
socketChannel.register(selector, SelectionKey.OP_READ);
// 往客戶端傳送傳送資訊
socketChannel.write(ByteBuffer.wrap("connected\n".getBytes()));
}
public void handlerRead(SelectionKey key)throws IOException{
SocketChannel socketChannel = (SocketChannel) key.channel();
// 建立讀取緩衝區
ByteBuffer byteBuffer = ByteBuffer.allocate(512);
// 從通道讀取可讀取的位元組數
try {
int readcount = socketChannel.read(byteBuffer);
if (readcount > 0) {
byte[] data = byteBuffer.array();
String msg = new String(data);
System.out.println("服務端收到的資訊為:\n" + msg);
ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes());
socketChannel.write(outBuffer);
} else {
System.out.println("客戶端異常退出");
}
} catch (IOException e) {
System.out.println("異常資訊:\n" + e.getMessage());
key.cancel();
}
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.initServer(8888);
server.listen();
}
}
客戶端
public class NIOClient1 {
private Selector selector;
private SocketChannel socketChannel;
private SocketAddress address;
public NIOClient1(String host, int port) throws IOException {
address = new InetSocketAddress(host, port);
selector = Selector.open();
socketChannel = SocketChannel.open(address);
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
public static void main(String[] args) {
Scanner scanner = new Scanner(System.in);
System.out.print("輸入伺服器ip地址:\t");
String host = scanner.next();
System.out.print("輸入埠號:\t");
int port = scanner.nextInt();
try {
NIOClient client = new NIOClient(host, port);
Selector selector = client.getSelector();
String msg = scanner.next();
while (true) {
if (msg.equals("q")) {
break;
}
client.getSocketChannel().write(ByteBuffer.wrap(msg.getBytes()));
if (selector.select() > 0) {
Set<SelectionKey> selectionKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
if (key.isReadable()) {
SocketChannel socketChannel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int size = socketChannel.read(buffer);
if (size > 0) {
buffer.flip();
byte[] data = new byte[size];
buffer.get(data);
System.out.println("來自服務端的訊息:\t" + new String(data));
}
}
iterator.remove();
}
}
msg = scanner.next();
}
} catch (IOException e) {
e.printStackTrace();
}
}
public Selector getSelector() {
return selector;
}
public SocketChannel getSocketChannel() {
return socketChannel;
}
public SocketAddress getAddress() {
return address;
}
public void setSelector(Selector selector) {
this.selector = selector;
}
public void setSocketChannel(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
}
public void setAddress(SocketAddress address) {
this.address = address;
}
}
執行效果
1. 先啟動服務端
2. 啟動客戶端