基於Javasocket NIO的一個CS聊天室
阿新 • • 發佈:2018-11-23
不同的SelectableChannel所支援的操作是不同的。例如ServerSocketChannel代表一個ServerSocket,它就只支援OP_ACCEPT操作;
當Selector上註冊的所有Channel都沒有需要處理的IO操作的時候,select方法將會被阻塞,呼叫該方法的執行緒被阻塞。
int select();//預設阻塞
int select(long timeout);//設定超時
int selectNow();//立即返回
伺服器上的所有的Channel(ServerSocketChannel 和 SocketChannel)都需要向selector註冊。
伺服器端需要使用ServerSocketChannel來監聽客戶端的連線請求。
ServerSocketChannel server = ServerSocketChannel.open();
InetSocketAddress isa = new InetSocketAddress("127.0.0.1",30000);
server.bind(isa);
server.configureBlocking(false);
server.register(selector,SelectionKey.OP_ACCEPT);
監聽到客戶端連線請求時,返回一個SocketChannel例項。
伺服器端:
package com.nanhao.server; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; public class Server { private Selector selector = null; static final int PORT = 30000; static final int BUFFSIZE = 1024; //定義實現編碼解碼的字符集物件 private Charset charSet = Charset.forName("UTF-8"); public void init()throws IOException{ selector = Selector.open(); ServerSocketChannel server = ServerSocketChannel.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT); server.bind(isa); //設定ServerSocket以非阻塞的方式進行 server.configureBlocking(false); //將server註冊到selector裡面(每個套接字具有的註冊功能) server.register(selector, SelectionKey.OP_ACCEPT); while(selector.select()>0){ for(SelectionKey sk:selector.selectedKeys()){ //一旦正在處理這個套接字,那麼就要先從集合中刪除這個套接字 selector.selectedKeys().remove(sk); if(sk.isAcceptable()){ SocketChannel sc = server.accept(); //設定非阻塞模式 sc.configureBlocking(false); //將該套接字註冊到selector裡面 sc.register(selector,SelectionKey.OP_READ); //將之前的sk修改為準備接受其他請求 sk.interestOps(SelectionKey.OP_ACCEPT); } if(sk.isReadable()){ SocketChannel sc = (SocketChannel)sk.channel(); //定義準備接受資料的BUFFER ByteBuffer buff = ByteBuffer.allocate(BUFFSIZE); String context = ""; //開始讀取資料 try{ while(sc.read(buff)>0){ buff.flip(); //實現解碼 context += charSet.decode(buff); } System.out.println("讀取的資料:"+context); //將此套接字對應的channel設定成準備下一次讀取 sk.interestOps(SelectionKey.OP_READ); //如果捕獲到該SK對應的channel出現異常的話,即表明該channel對應的client出現了問題 //所以從Selector裡面取消sk的註冊。 }catch(IOException io){ sk.cancel(); if(sk.channel() !=null){ sk.channel().close(); } } if(context.length()>0){ for(SelectionKey key :selector.keys()){ //獲取key對應的channel Channel targetChannel = key.channel(); if(targetChannel instanceof SocketChannel){ SocketChannel dest = (SocketChannel) targetChannel; //實現編碼 dest.write(charSet.encode(context)); } } } } } } } public static void main(String[]args) throws IOException{ new Server().init(); } }
客戶端:
package com.nanhao.client; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Scanner; public class Client{ private Selector selector =null; static final int PORT=30000; static final int BUFFSIZE = 1024; private Charset charset = Charset.forName("UTF-8"); //建立客戶端套接字 private SocketChannel sc = null; public void init()throws IOException { selector = Selector.open(); InetSocketAddress isa = new InetSocketAddress("127.0.0.1",PORT); //呼叫靜態open方法建立連線到指定主機的SocketChannel sc = SocketChannel.open(); //設定非阻塞的模式 sc.configureBlocking(false); //註冊到Selector sc.register(selector, SelectionKey.OP_READ); //啟動讀取伺服器端資料庫資料的執行緒 new ClientThread().start(); //建立鍵盤輸入流 Scanner scanner = new Scanner(System.in); while(scanner.hasNextLine()){ String line = scanner.nextLine(); //將鍵盤的內容寫到SocketChannel sc.write(charset.encode(line)); } } private class ClientThread extends Thread { public void run(){ try{ while(selector.select()>0){ //遍歷每個IO可用的channel對應的SelectorKey for(SelectionKey sk :selector.selectedKeys()){ selector.selectedKeys().remove(sk); if(sk.isReadable()){ SocketChannel sc = (SocketChannel)sk.channel(); //建立buff ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFSIZE); String context = ""; while(sc.read(byteBuffer)>0){ //清空記憶體 byteBuffer.flip(); context += charset.decode(byteBuffer); } System.out.println("聊天資訊:"+context); sk.interestOps(SelectionKey.OP_READ); } } } }catch(IOException io){ io.printStackTrace(); } } } public static void main(String[]args) throws IOException{ new Client().init(); } }