使用NIO實現非阻塞Socket通訊原理
阿新 • • 發佈:2019-01-10
剛學了NIO,寫一下自己的理解
網路通訊中,NIO提供了SocketChannel和ServerSocketChannel兩種不同的套接字通道來實現,可以設定阻塞與非阻塞兩種模式,為了實現高負載高併發都採取非阻塞的模式。通道是雙向的,可以同時在通道上傳送和讀取資料。NIO採用可分配大小的緩衝區Buffer實現對資料的讀寫操作。
伺服器僅採用一個執行緒去處理所有的客戶端執行緒,這就需要建立一個Selector,將ServerSocketChannel和想要監控的SocketChannel註冊到Selector中(用SelectableChannel的register方法,該方法返回一個這個channel向Selector註冊的鍵,是一個SelectionKey例項,它包裝了SelectableChannel和該通道感興趣的操作)。
Selector就像一個觀察者,不斷地獲取Selector的select方法的返回值,返回值是準備就緒的SelectionKey的數目,然後就進行處理(通過Selector的selectedKeys方法返回被選擇的SelectionKey集合,然後處理連線請求和讀取資料)。
每個客戶端只有一個SocketChannel,將該SocketChannel註冊到指定的Selector後,監聽該Selector即可。如果監聽到該Selector的select方法的返回值大於0,表明該Selector上有需要進行IO處理的SelectionKey,獲取到SocketChannel後即可處理請求和資料。
貼一段練習的程式碼:
import java.io.*;
import java.nio.*;
import java.nio.channels.*;
import java.nio.charset.*;
import java.net.*;
import java.util.*;
class NServer
{
private Selector selector = null;
static final int PORT = 30000;
private Charset charset = Charset.forName("UTF-8");
public void init() throws IOException
{
selector = Selector.open();
ServerSocketChannel server = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT);
server.bind(isa);
//設定ServerSocket以非阻塞方式工作
server.configureBlocking(false);
//將server註冊到指定的Selector物件
server.register(selector,SelectionKey.OP_ACCEPT);
while(selector.select()>0)
{
//依次處理每個已選擇的SelectionKey
for(SelectionKey sk : selector.selectedKeys())
{
selector.selectedKeys().remove(sk);
//如果sk對應的Channel包含客戶端的連線請求
if(sk.isAcceptable())
{
SocketChannel sc = server.accept();
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
//將sk對應的Channel設定成準備接受其他請求
sk.interestOps(SelectionKey.OP_ACCEPT);
}
//如果sk對應的Channel有資料需要讀取
if(sk.isReadable())
{
//獲取該SelectionKey對應的Channel
SocketChannel sc = (SocketChannel)sk.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
try
{
while(sc.read(buff)>0)
{
buff.flip();
content += charset.decode(buff);
}
System.out.println("讀取的資料:"+ content);
sk.interestOps(SelectionKey.OP_READ);
}
//如果捕獲到帶sk對應的Channel出現了異常,即表明該Channel對應的Client出現了問題,
//所以從Selector中取消sk的註冊
catch (IOException e)
{
sk.cancel();
if(sk.channel() != null)
sk.channel().close();
}
//將收到的資料發給所有註冊的SelectionKey
if(content.length()>0)
{
for(SelectionKey key : selector.keys())
{
Channel targetChannel = key.channel();
if(targetChannel instanceof SocketChannel)
{
SocketChannel dest = (SocketChannel)targetChannel;
dest.write(charset.encode(content));
}
}
}
}
}
}
}
public static void main(String[] args) throws IOException
{
new NServer().init();
}
}
class NClient
{
private Selector selector = null;
static final int PORT = 30000;
private Charset charset = Charset.forName("UTF-8");
private SocketChannel sc = null;
public void init() throws IOException
{
selector = Selector.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT);
sc = SocketChannel.open(isa);
sc.configureBlocking(false);
sc.register(selector,SelectionKey.OP_READ);
//啟動讀取服務端資料的執行緒
new ClientThread().start();
//建立鍵盤輸入流
Scanner scan = new Scanner(System.in);
while (scan.hasNextLine())
{
String line = scan.nextLine();
sc.write(charset.encode(line));
}
}
//定義讀取服務端資料的執行緒
private class ClientThread extends Thread
{
public void run()
{
try
{
while (selector.select()>0)
{
//遍歷每個有可用IO操作的Channel對應的SelectionKey
for(SelectionKey sk : selector.selectedKeys())
{
selector.selectedKeys().remove(sk);
if(sk.isReadable())
{
//使用NIO讀取Channel中的資料
SocketChannel sc = (SocketChannel)sk.channel();
ByteBuffer buff = ByteBuffer.allocate(1024);
String content = "";
while(sc.read(buff)>0)
{
sc.read(buff);
buff.flip();
content += charset.decode(buff);
}
System.out.println("聊天資訊:"+ content);
sk.interestOps(SelectionKey.OP_READ);
}
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
public static void main(String[] args) throws IOException
{
new NClient().init();
}
}