NIO實現TCP的非阻塞通訊
阿新 • • 發佈:2018-12-30
這一次寫NIO實現非阻塞通訊時遇到了很多問題,我所理解的非阻塞是對於一個使用者而言它的讀寫不會相互制約,而在此次編寫過程中,發現其實非阻塞是相對於多個使用者而言的。
看到網上一個對同步非同步阻塞非阻塞的例子,感覺挺好的,就拷過來了:
老張愛喝茶,廢話不說,煮開水。
出場人物:老張,水壺兩把(普通水壺,簡稱水壺;會響的水壺,簡稱響水壺)。
1 老張把水壺放到火上,立等水開。(同步阻塞)
老張覺得自己有點傻
2 老張把水壺放到火上,去客廳看電視,時不時去廚房看看水開沒有。(同步非阻塞)
老張還是覺得自己有點傻,於是變高端了,買了把會響笛的那種水壺。水開之後,能大聲發出嘀~~~~的噪音。
3 老張把響水壺放到火上,立等水開。(非同步阻塞)
老張覺得這樣傻等意義不大
4 老張把響水壺放到火上,去客廳看電視,水壺響之前不再去看它了,響了再去拿壺。(非同步非阻塞)
老張覺得自己聰明瞭。
所謂同步非同步,只是對於水壺而言。
普通水壺,同步;響水壺,非同步。
雖然都能幹活,但響水壺可以在自己完工之後,提示老張水開了。這是普通水壺所不能及的。
同步只能讓呼叫者去輪詢自己(情況2中),造成老張效率的低下。
所謂阻塞非阻塞,僅僅對於老張而言。
立等的老張,阻塞;看電視的老張,非阻塞。
情況1和情況3中老張就是阻塞的,媳婦喊他都不知道。雖然3中響水壺是非同步的,可對於立等的老張沒有太大的意義。所以一般非同步是配合非阻塞使用的,這樣才能發揮非同步的效用。
——來源網路,作者不明。
作者:愚抄
連結:https://www.zhihu .com/question/19732473/answer/23434554
來源:知乎
著作權歸作者所有,轉載請聯絡作者獲得授權。
使用的類:
1.ServerSocketChannel類:ServerSocket的代替類,支援非阻塞通訊;
2.SocketChannel類:Socket的代替類,支援非阻塞通訊;
3.Selector:為ServerSocketChannel監控接收連線就緒事件,為SocketChannel監控連線就緒,讀就緒和寫就緒事件。
4.SelectionKey:代表ServerSocketChannel和SocketChannel向Selector註冊事件的控制代碼。
基本思路:
伺服器端:
1.使用ServerSocketChannel類對地址埠號進行繫結,通過open()方法獲取通道,並且設定為非阻塞;
2.通過Selector的open()方法,建立Selector物件;
3.進行註冊連線就緒事件;
3.通過Selector的select()方法返回註冊事件的數目,返回物件為SelectionKey,對SelectionKey進行遍歷,當監聽到連線事件的時候,就使用accept()連線並返回一個SocketChannel物件,對它註冊讀就緒事件;
4.收到訊息之後進行反饋;
客戶端:
1.通過IP和埠號進行連線;
2.通過Selector的open()方法建立該類物件;
3.進行註冊讀事件監聽,讀取伺服器端的反饋;
4.輸出訊息;
程式碼:
伺服器端:
package sency.lay.two;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 伺服器端
*
* @author sency
*
*/
public class NServer {
private ServerSocketChannel serverSocketChannel;
private Selector selector;
private int port = 8000;
// 用於編碼解碼物件
private Charset charse = Charset.forName("GBK");
public static SelectionKey key = null;
public NServer() throws IOException {
// 建立Selector物件
selector = Selector.open();
// 開啟一個伺服器端通道
serverSocketChannel = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
serverSocketChannel.bind(isa);
// 設定為非阻塞模式
serverSocketChannel.configureBlocking(false);
// 為伺服器通道註冊連線就緒監聽事件
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
System.out.println("=======Welcome!!!=======");
}
public static void main(String args[]) throws IOException {
new NServer().service();
}
private void service() throws IOException {
// TODO Auto-generated method stub
// 通過select方法選擇一組鍵,其相應的通道已為 I/O操作準備就緒
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
key = it.next();
// 刪除正在處理的key
it.remove();
if (key.isAcceptable()) {
SocketChannel sc = serverSocketChannel.accept();
if (sc.isConnected()) {
System.out.println("連線到:"
+ sc.socket().getInetAddress() + "--"
+ sc.socket().getPort());
}
// 設定為非阻塞模式
sc.configureBlocking(false);
// 為返回的SocketChannel物件註冊讀就緒和寫就緒監聽
sc.register(selector, SelectionKey.OP_READ);
}
if (key.isReadable()) {
try {
readMsg(key);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
}
private void sendEcho(SelectionKey key,String content) throws IOException {
if(content.length()>0){
SocketChannel sc = (SocketChannel) key.channel();
String echo = "#ServerEcho:"+content+"收到!";
sc.write(charse.encode(echo));
System.out.println(echo);
}
}
private void readMsg(SelectionKey key) throws IOException {
// TODO Auto-generated method stub
// 獲取該key所對應的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 建立一個ByteBuffer用於存放讀取的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
String content = "";
try {
while (socketChannel.read(buffer) > 0) {
// 將極限設定為當前位置,再將當前位置設定為0
buffer.flip();
// 對讀取取內容進行解碼
content += charse.decode(buffer);
}
System.out.println("#Client:" + content);
// 將key對應的Channel設定為準備下一次讀取
key.interestOps(SelectionKey.OP_READ);
buffer.clear();
} catch (IOException e) {
key.cancel();
if (key.channel() != null) {
key.channel().close();
}
}
sendEcho(key,content);
}
}
客戶端:
package sency.lay.two;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Scanner;
import java.util.Set;
/**
* 客戶端
*
* @author sency
*
*/
public class NClient {
private SocketChannel socketChannel;
private Selector selector;
private int port = 8000;
private Charset charse = Charset.forName("GBK");
public SelectionKey key = null;
public NClient() throws IOException {
selector = Selector.open();
// 建立SocketChannel
InetSocketAddress isa = new InetSocketAddress("127.0.0.1", port);
socketChannel = SocketChannel.open();
socketChannel.connect(isa);
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_READ);
}
public static void main(String args[]) throws IOException {
System.out.println("=======客戶端=======");
new NClient().talk();
}
private void talk() throws IOException {
Scanner scan=new Scanner(System.in);
new MyThread().start();
while(scan.hasNextLine()){
//讀取鍵盤的輸入
String line=scan.nextLine();
//將鍵盤的內容輸出到SocketChanenel中
socketChannel.write(charse.encode(line));
System.out.println("#Client:"+line);
}
}
class MyThread extends Thread{
public void run() {
// TODO Auto-generated method stub
try {
while (selector.select() > 0) {
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
key = it.next();
it.remove();
// TODO Auto-generated method stub
if (key.isReadable()) {
try {
receiveMsg(key);
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
private void receiveMsg(SelectionKey key) throws IOException {
// TODO Auto-generated method stub
// 獲取該key所對應的SocketChannel
SocketChannel socketChannel = (SocketChannel) key.channel();
// 建立一個ByteBuffer用於存放讀取的資料
ByteBuffer buffer = ByteBuffer.allocate(1024);
String content = "";
try {
while (socketChannel.read(buffer) > 0) {
// 將極限設定為當前位置,再將當前位置設定為0
buffer.flip();
// 對讀取取內容進行解碼
content += charse.decode(buffer);
}
System.out.println(content);
// 將key對應的Channel設定為準備下一次讀取
key.interestOps(SelectionKey.OP_READ);
} catch (IOException e) {
if (key != null) {
key.cancel();
key.channel().close();
}
}
}
}
水平太低啦,大家不要介意,努力學習ing!