1. 程式人生 > >IO、NIO實現簡單聊天室,附帶問題解析

IO、NIO實現簡單聊天室,附帶問題解析

>   本篇文章主要使用IO和NIO的形式來實現一個簡單的聊天室,並且說明IO方法存在的問題,而NIO又是如何解決的。 > >   大概的框架為,先提供思路和大概框架圖——程式碼——問題及解決方式,這樣會容易看一點。 # 1. IO寫法 ## 1.1 思路框架 >   下面編寫一個簡單的聊天室,大概需要的功能就是服務端維護一個聊天室,裡邊的客戶端傳送訊息之後服務將其訊息轉發給其他客戶端,達到一個聊天室的效果。   大致的思路:**服務端區分職責,分成兩部分,主執行緒負責接收連線並把連線放入到執行緒池中處理,維護一個執行緒池,所有對於socket的處理都交給執行緒池中的執行緒來處理。如下圖。** ![socket架構圖](https://images.cnblogs.com/cnblogs_com/zhangweicheng/1583123/o_200721144427IO%E6%9E%B6%E6%9E%84%E5%9B%BE.jpg)   下面貼上**demo**程式碼(程式碼中有幾處為了方便並沒有採用最規範的定義方式,如執行緒池的建立和**Map**初始化的時候未設定初始容量等)   **程式碼分五個類,服務端(ChatServer,監聽作用,為服務端主執行緒)、客戶端(ChatClient)、服務端處理器(ServerHandler,可以理解為執行緒池中要執行的事情)、客戶端處理器(ClientHandler,客戶端讀寫伺服器訊息的處理),工具類(SocketUtils,只有一個傳送訊息方法)。** ## 1.2 demo程式碼 **服務端:** ```java /** * 服務端啟動類 * 主要負責監聽客戶端連線 */ public class ChatServer { public static void main(String[] args) { ServerSocket serverSocket = null; /*----------為了方便使用Executors建立執行緒-------------*/ ExecutorService handlerThreadPool = Executors.newFixedThreadPool(100); try { serverSocket = new ServerSocket(8888); while (true) { System.out.println("-----------阻塞等待連線------------"); Socket socket = serverSocket.accept(); String key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); System.err.println(key + "已連線"); // 主執行緒只接收,處理直接交給處理執行緒池 handlerThreadPool.execute(new ServerHandler(socket)); } } catch (IOException e) { e.printStackTrace(); if (Objects.nonNull(serverSocket)) { try { serverSocket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } } } ``` **服務端處理類:** ```java /** * 服務端socket事件處理類 * 負責處理對應socket中的讀寫操作 */ public class ServerHandler implements Runnable { /** * 連線到服務端的所有連線 socket的地址埠->socket */ private static final Map socketMap = new ConcurrentHashMap<>(); /** * 維護名稱和地址的map */ private static final Map nameMap = new ConcurrentHashMap<>(); private Socket socket; /** * 每個socket的標識,使用地址+埠構成 */ private String key; public ServerHandler() { } public ServerHandler(Socket socket) { this.socket = socket; this.key = socket.getInetAddress().getHostAddress() + ":" + socket.getPort(); } @Override public void run() { Socket s = socket; // 根據訊息執行不同操作 InputStream inputStream; // debug檢視資料用 // Map tmpMap = socketMap; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (line.startsWith("register")) { // 登記 String[] split = line.split(":"); String name = split[1]; String msg; // 校驗是否存在 if (socketMap.containsKey(key)) { msg = "請勿重複登記"; sendMsg(s, msg); return; } if (nameMap.containsValue(name)) { msg = "名稱已被登記,請換一個名稱"; sendMsg(s, msg); return; } // 通知自己已連線 sendMsg(s, "已連線到伺服器"); msg = name + "進入聊天室"; // 將訊息轉發給其他客戶端 sendMsgToClients(msg); // 放入socket池 socketMap.put(key, s); nameMap.put(key, name); System.err.println(name + "已登記"); } else if (line.trim().equalsIgnoreCase("end")) { if (notPassRegisterValidate()) { continue; } // 斷開連線 socketMap.remove(key); String name = nameMap.get(key); String msg = name + "離開聊天室"; System.err.println(msg); // 將訊息轉發給其他客戶端 sendMsgToClients(msg); msg = "已斷開連線"; // 傳送給對應的連線斷開資訊 sendMsg(s, msg); inputStream.close(); break; } else { if (notPassRegisterValidate()) { continue; } // 正常通訊 String name = nameMap.get(key); String msg = name + ":" + line; // 將訊息轉發給其他客戶端 sendMsgToClients(msg); } } } catch (IOException e) { e.printStackTrace(); } } /** * 是否已登入校驗 * * @return 是否已登入 */ private boolean notPassRegisterValidate() { boolean hasRegister = nameMap.containsKey(key); if (hasRegister) { return false; } String msg = "您還未登入,請先登入"; sendMsg(socket, msg); return true; } /** * 往連線傳送訊息 * * @param socket 客戶端連線 * @param msg 訊息 */ private void sendMsg(Socket socket, String msg) { SocketUtils.sendMsg(socket, msg); if (socket.isClosed()) { socketMap.remove(key); } } /** * 傳送給其他客戶端資訊 * * @param msg 資訊 */ private void sendMsgToClients(String msg) { for (Map.Entry entry : socketMap.entrySet()) { if (this.key.equals(entry.getKey())) { continue; } sendMsg(entry.getValue(), msg); } } } ``` **工具類(一個傳送訊息的方法):** ```java public class SocketUtils { private SocketUtils() { } public static void sendMsg(Socket socket, String msg) { Socket s = socket; OutputStream outputStream = null; msg += "\r\n"; try { outputStream = s.getOutputStream(); outputStream.write(msg.getBytes(StandardCharsets.UTF_8)); outputStream.flush(); } catch (IOException e) { System.err.println("傳送訊息失敗, 連線已斷開"); try { if (Objects.nonNull(outputStream)) { outputStream.close(); } socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } } ``` **客戶端:** ```java /** * 客戶端讀和寫各自使用一個執行緒 */ public class ChatClient { public static void main(String[] args) { Socket socket; ExecutorService clientHandlerPool = Executors.newFixedThreadPool(2); try { socket = new Socket("localhost", 8888); // 寫執行緒 clientHandlerPool.execute(new ClientHandler(socket, 1)); // 讀執行緒 clientHandlerPool.execute(new ClientHandler(socket, 0)); } catch (IOException e) { e.printStackTrace(); } } } ``` **客戶端處理器:** ```java /** * 客戶端處理器 * 根據type來區分是做讀工作還是寫工作 */ public class ClientHandler implements Runnable { private Socket socket; /** * 處理型別,0-讀、1-寫 */ private int type; public ClientHandler() { throw new IllegalArgumentException("不能使用沒有引數的建構函式"); } public ClientHandler(Socket socket, int type) { this.socket = socket; this.type = type; } @Override public void run() { if (type == 1) { // 進行寫操作 doWriteJob(); return; } // 預設讀操作 doReadJob(); } /** * 讀操作 */ private void doReadJob() { Socket s = socket; InputStream inputStream; try { inputStream = s.getInputStream(); Scanner scanner = new Scanner(inputStream); while (true) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { System.err.println(line); } // 如果已退出了,那麼關閉連線 if ("已斷開連線".equals(line)) { socket.close(); break; } } } catch (IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } /** * 寫執行緒 */ private void doWriteJob() { Socket s = socket; try { Scanner scanner = new Scanner(System.in); while (true) { String output = scanner.nextLine(); if (Objects.nonNull(output) && !"".equals(output)) { SocketUtils.sendMsg(s, output); } } } catch (Exception e) { e.printStackTrace(); System.err.println("錯誤發生了:" + e.getMessage()); } } } ``` **結果:** ![IO結果圖](https://images.cnblogs.com/cnblogs_com/zhangweicheng/1583123/o_200721145521IO%E7%BB%93%E6%9E%9C%E5%9B%BE.jpg) > 思考:**當前這樣實現有什麼瓶頸,可能會出現什麼問題?** **存在問題:** > 1. 服務端使用**accept**阻塞接收執行緒,連線一個一個處理,在**高併發下處理效能緩慢**。 > 2. 沒有連線的時候執行緒一直處於**阻塞狀態**造成**資源的浪費**(如果使用多執行緒接收處理併發,那麼沒連線的時候造成多個執行緒的資源浪費)。 # 2. 使用NIO實現聊天室 ## 2.1 整體思路   那我們來看下**NIO**是怎麼解決上方的問題的,首先上這個**demo**整體的架構圖。 ![NIO架構圖](https://images.cnblogs.com/cnblogs_com/zhangweicheng/1583123/o_200721145529NIO%E6%9E%B6%E6%9E%84%E5%9B%BE.jpg)   **大概的邏輯為** > 1. 服務端將**ServerSocketChannel**註冊到**Selector**中,客戶端連線進來的時候事件觸發,將客戶端的連線註冊到selector中。 > 2. 主執行緒負責**selector**的輪詢工作,發現有事件可以處理就將其交給**執行緒池**。 > 3. 客戶端同理分成兩個部分,寫操作和讀操作,每個操作由一個執行緒單獨完成;但是如果讀操作處理使用while迴圈不斷輪詢等待接收的話,**CPU會飆升**,所以需要客戶端**新建一個selector來解決這個問題**,注意這個**selector**跟服務端不是同一個,沒有啥關係。   程式碼分類大致跟IO寫法一樣,**分成服務端、服務端處理器、客戶端、客戶端處理器**,下面為demo。 ## 2.2 程式碼 **服務端:** ```java public class ChatServer { private Selector selector; private ServerSocketChannel serverSocketChannel; private static final ExecutorService handlerPool = Executors.newFixedThreadPool(100); public ChatServer() throws IOException { this.selector = Selector.open(); serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.bind(new InetSocketAddress(9999)); // 將服務端的socket註冊到selector中,接收客戶端,並將其註冊到selector中,其本身也是selector中的一個I/O事件 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.err.println("聊天室服務端初始化結束"); } /** * 啟動方法 * 1.監聽,拿到之後進行處理 */ public void start() throws IOException { int count; while (true) { // 可能出現select方法沒阻塞,空輪詢導致死迴圈的情況 count = selector.select(); if (count > 0) { Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); // 交給執行緒池處理 handlerPool.execute(new ServerHandler(key, selector)); // 處理完成後移除 iterator.remove(); } } } } public static void main(String[] args) throws IOException { new ChatServer().start(); } } ``` **服務端處理器:** ```java public class ServerHandler implements Runnable { private SelectionKey key; private Selector selector; public ServerHandler() { } /** * 本來可以通過key拿到selector,這裡為了圖方便就這樣寫了 */ public ServerHandler(SelectionKey key, Selector selector) { this.key = key; this.selector = selector; } @Override public void run() { try { if (key.isAcceptable()) { // 說明是服務端的事件,注意這裡強轉換為的是ServerSocketChannel ServerSocketChannel channel = (ServerSocketChannel) key.channel(); // 接收連線 SocketChannel socket = channel.accept(); if (Objects.isNull(socket)) { return; } socket.configureBlocking(false); // 接收客戶端的socket並且將其註冊到服務端這邊的selector中,注意客戶端在此時跟服務端selector產生關聯 socket.register(selector, SelectionKey.OP_READ); System.err.println("服務端已接收連線"); } else if (key.isReadable()) { // 客戶端傳送資訊過來了 doReadJob(); } } catch (IOException e) { e.printStackTrace(); // 錯誤處理 } } /** * 讀取操作 */ private void doReadJob() throws IOException { SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer buffer = ByteBuffer.allocate(1024); int readCount = socketChannel.read(buffer); if (readCount > 0) { String msg = new String(buffer.array(), StandardCharsets.UTF_8); System.err.println(socketChannel.getRemoteAddress().toString() + "的資訊為:" + msg); // 轉發給其他客戶端 sendMsgToOtherClients(msg); } } /** * 轉發訊息給其他客戶端 * * @param msg 訊息 */ private void sendMsgToOtherClients(String msg) throws IOException { SocketChannel self = (SocketChannel) key.channel(); Set keys = selector.keys(); Iterator iterator = keys.iterator(); while (iterator.hasNext()) { SelectionKey selectionKey = iterator.next(); SelectableChannel channel = selectionKey.channel(); // 如果是本身或者不是socketChannel型別則跳過 if (self.equals(channel) || channel instanceof ServerSocketChannel) { continue; } SocketChannel socketChannel = (SocketChannel) channel; ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes(StandardCharsets.UTF_8)); socketChannel.write(byteBuffer); } } } ``` **客戶端:** ```java public class ChatClient { private Selector selector; private SocketChannel socketChannel; private static ExecutorService dealPool = Executors.newFixedThreadPool(2); public ChatClient() throws IOException { /* * 說明一下: * 客戶端這邊的selector跟剛才在服務端定義的selector是不同的兩個selector * 客戶端這邊不需要selector也能實現功能,但是讀取的時候必須不斷的迴圈,會導致CPU飆升, * 所以使用selector是為了解決這個問題的,別跟服務端的selector搞混就好 */ selector = Selector.open(); socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 9999)); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_READ); } public void start() throws IOException, InterruptedException { // 連線 // socketChannel.connect(new InetSocketAddress("localhost", 9999)); while (!socketChannel.finishConnect()) { System.err.println("正在連線..."); TimeUnit.MILLISECONDS.sleep(200); } System.err.println("連線成功"); // 使用兩個執行緒來分別處理讀取和寫操作 // 寫資料 dealPool.execute(new ClientHandler(selector, socketChannel, 1)); // 讀取資料 dealPool.execute(new ClientHandler(selector, socketChannel, 0)); } public static void main(String[] args) throws IOException, InterruptedException { new ChatClient().start(); } } ``` **客戶端處理器:** ```java public class ClientHandler implements Runnable { private Selector selector; private SocketChannel socketChannel; /** * 0-讀,1-寫 */ private int type; public ClientHandler() { } public ClientHandler(Selector selector, SocketChannel socketChannel, int type) { // selector是為了解決讀時候CPU飆升的問題,具體見客戶端的啟動類程式碼註釋 this.selector = selector; this.socketChannel = socketChannel; this.type = type; } @Override public void run() { try { if (type == 0) { doClientReadJob(); return; } doClientWriteJob(); } catch (IOException e) { e.printStackTrace(); } } /** * 寫操作 */ private void doClientWriteJob() throws IOException { SocketChannel sc = socketChannel; Scanner scanner = new Scanner(System.in); while (true) { if (scanner.hasNextLine()) { String line = scanner.nextLine(); if (null != line && !"".equals(line)) { ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8)); sc.write(buffer); } } } } /** * 讀操作 */ private void doClientReadJob() throws IOException { SocketChannel sc = socketChannel; ByteBuffer buf = ByteBuffer.allocate(1024); while (true) { int select = selector.select(); if (select > 0) { Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { // 這是必須的,不然下方的remove會出錯 SelectionKey next = iterator.next(); // 這裡因為只有本身這個客戶端註冊到客戶端的selector中,所以有事件一定是它的,也就不用從key拿了,直接操作就行 buf.clear(); int read = sc.read(buf); if (read > 0) { String msg = new String(buf.array(), StandardCharsets.UTF_8); System.err.println(msg); } // 事件處理完之後要移除這個key,否則的話selector.select()方法不會再讀到這個key,即便有新的時間到這個channel來 iterator.remove(); } } } } } ```   **結果圖:** ![NIO結果圖](https://images.cnblogs.com/cnblogs_com/zhangweicheng/1583123/o_200721145536NIO%E7%BB%93%E6%9E%9C%E5%9B%BE.jpg) 在編寫的過程中發現了以下兩點: > 1. **select**方法之後如果存在**key**,並且接下來的操作未對這個**selectionKey**做**remove**操作,那麼**下次的select不會再將其選入**,即便有事件發生,也就是說,**select方法不會選擇之前已經選過的key。** > 2. **selector.select()方法中偶爾會出現不阻塞的情況**。這就是**NIO**中的空輪詢**bug**,也就是說,**沒有連線又不阻塞的話,while(true) ... 的寫法就是一個死迴圈,會導致CPU飆升。**   第二點問題在**NIO**框架(如**netty**)中都採用了比較好的解決方法,可以去查下如何解決的。接下來看下**NIO**的寫法是否解決了**IO**寫法中存在的問題: > 1. **服務端使用accept阻塞接收執行緒,連線一個一個處理,在高併發下處理效能緩慢。** > > **答:**上述寫法中還是使用**一個ServerSocketChannel**來接收客戶端,沒有解決這個問題;但是可以通過**使用執行緒池的方式來解決**。也就是說**將服務端的事件分成兩個部分,第一個部分為接收客戶端,使用一個執行緒池來維護;第二個部分為客戶端的事件處理操作,也維護一個執行緒池來執行這些事件。** > >   這樣效能上去了,**由於selector的存在也不會出現資源浪費的事情**,**netty**就是這麼做的哦。 > > 2. **沒有連線的時候執行緒一直處於阻塞狀態造成資源的浪費(如果使用多執行緒接收處理併發,那麼沒連線的時候造成多個執行緒的資源浪費)。** > > **答:**解決。NIO寫法主要有**selector**不斷輪詢,不會出現沒連線不作為的情況,而且多個連線的話也沒有問題(參考1的回答)。 # 3. 小結   兩種寫法都有Reactor模式的影子,但是IO寫法有明顯的缺點就是如果沒有連線會造成資源浪費的問題(採用多個接收連線的話更甚),而NIO中selector輪詢機制就很好的解決了無連線時無作為的情況,並且在效能方面可以通過職責分類和執行緒池來得到改善,所以,NIO,永遠滴神。   > 需要壓力,需要