1. 程式人生 > >JAVA中的BIO,NIO與多路複用(select,poll,epoll)

JAVA中的BIO,NIO與多路複用(select,poll,epoll)

1.Socket連結的建立

   java程式(server) 在應用空間中執行,當建立一個socket連結時,會向核心空間中的核心程式(sc)傳送指令,核心程式中一定會執行Socket(AF_UNIX,SOCK_STAM,0) -> fd(檔案識別符號)6  --傳遞-> bind(6,9999) 繫結埠和檔案識別符號 --監聽-->listen(6)

2.BIO (Blocking I/O)

  概念:同步阻塞型IO,對於客戶端的每個連結都將開啟一個新的執行緒處理.

  流程:在BIO中apccet的阻塞是在核心程式中阻塞,在使用strace 監控執行緒時可以看到 accept(6, 後停止列印,直到有連結建立然後才會繼續剩餘程式碼的執行. IO的阻塞在strace中可以看到是recv(5 停止列印直到有資料傳輸  

  優點:可以與多個客戶端建立連結

  缺點:一連結一執行緒,導致記憶體浪費.連結建立的越多,CPU執行緒切換更加頻繁.

  程式碼:

 1 2 
 3 import java.io.*;
 4 import java.net.ServerSocket;
 5 import java.net.Socket;
 6 import java.util.concurrent.*;
 7 
 8 /**
 9  * @author baiyang
10  * @version 1.0
11  * @date 2020/6/9 11:10 下午
12  */
13 public class Server {
14 
15     public static void main(String[] args) throws Exception {
16 
17         ExecutorService executorService = Executors.newFixedThreadPool(50);
18         final ServerSocket serverSocket = new ServerSocket(9999);
19         while (true){
20             Socket socket = serverSocket.accept(); // 阻塞
21             new Thread(() ->{
22                 BufferedReader bufferedReader = null;
23                 int port = socket.getPort();
24                 System.out.println(port);
25                 try {
26                     bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream()))); // IO阻塞
27                     String clientMessage = bufferedReader.readLine();
28                     System.out.println(clientMessage);
29 
30                     BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream())));
31                     bufferedWriter.write("this is service\n");
32                     bufferedWriter.flush();
33                     bufferedWriter.close();
34                     bufferedReader.close();
35                 } catch (IOException e) {
36                     e.printStackTrace();
37                 }
38             }).start();
39 
40         }
41     }
42 }
 1 package com.diandian.client.bio;
 2 
 3 import java.io.*;
 4 import java.net.InetSocketAddress;
 5 import java.net.Socket;
 6 
 7 /**
 8  * @author baiyang
 9  * @version 1.0
10  * @date 2020/6/9 11:18 下午
11  */
12 public class Client {
13 
14     public static void main(String[] args) throws Exception {
15         Socket socket = new Socket();
16         socket.connect(new InetSocketAddress(9090));
17         BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter(new BufferedOutputStream(socket.getOutputStream())));
18         bufferedWriter.write("this is client\n");
19         bufferedWriter.flush();
20         BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new BufferedInputStream(socket.getInputStream())));
21         String clientMessage = bufferedReader.readLine();
22         bufferedReader.close();
23         bufferedWriter.close();
24         System.out.println(clientMessage);
25 
26 
27     }
28 }
View Code

3.NIO(java -> New I/O  作業系統 -> NONBLOCKING)

概念:在java中NIO 指的是new IO 1.4版本之後新的一個包出現

  在作業系統中的體現是NONBLOCKING的支援

  同步非阻塞

流程:在NIO中,ServerSocketChannel類中呼叫方法configureBlocking(false),accept將不在阻塞.當沒有連結時accept() = -1,當客戶端沒有傳送資訊時recv() = -1.BIO中的阻塞就解決了

優點: 解決了BIO多執行緒的問題,解決了C10K問題

缺點: 迴圈遍歷已連結的客戶端,實現監控是否有資料寫入. ---> 每一次的迴圈將會向核心程式傳送一條指令,假設有1W個連結建立且只有1個客戶端向伺服器傳送指令,那麼需要向核心傳送1W次指令,無效指令數9999次.

程式碼:

 1 package com.diandian.server.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.ServerSocketChannel;
 6 import java.nio.channels.SocketChannel;
 7 import java.util.ArrayList;
 8 
 9 /**
10  * @author baiyang
11  * @version 1.0
12  * @date 2020/6/11 11:32 下午
13  */
14 public class NioServer {
15 
16     public static void main(String[] args) {
17         //  儲存已連結的客戶端
18         ArrayList<SocketChannel> clients = new ArrayList<>();
19         try {
20             ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
21             serverSocketChannel.bind(new InetSocketAddress(9999));
22             // 設定false不阻塞 呼叫 OS  NONBLOCKING
23             serverSocketChannel.configureBlocking(false);
24             while (true) {
25                 // 可有可無方便測試
26                 Thread.sleep(2000);
27                 SocketChannel client = serverSocketChannel.accept();
28                 if (null == client) {
29                     System.out.println("沒有客戶端建立連結");
30                 } else {
31                     client.configureBlocking(false);
32                     clients.add(client);
33                     System.out.println(client.socket().getPort());
34                 }
35                 ByteBuffer byteBuffer = ByteBuffer.allocateDirect(1024);
36                 for (SocketChannel s : clients){
37                     int read = s.read(byteBuffer);
38                     if(read > 0){
39                         byteBuffer.flip();
40                         byte[] bytes = new byte[byteBuffer.limit()];
41                         byteBuffer.get(bytes);
42                         System.out.println("收到客戶端資訊:"+new String(bytes));
43                         byteBuffer.clear();
44                     }
45                 }
46 
47             }
48         } catch (Exception e) {
49             e.printStackTrace();
50         }
51     }
52 }

 

 

4.多路複用

 本質上select poll epoll 都是IO同步的,讀寫操作在就緒後都將由自己進行讀寫,所以讀寫操作是阻塞的

 1 package com.diandian.service.nio;
 2 
 3 import java.net.InetSocketAddress;
 4 import java.nio.ByteBuffer;
 5 import java.nio.channels.SelectionKey;
 6 import java.nio.channels.Selector;
 7 import java.nio.channels.ServerSocketChannel;
 8 import java.nio.channels.SocketChannel;
 9 import java.util.Iterator;
10 import java.util.Set;
11 
12 /**
13  * @author baiyang
14  * @version 1.0
15  * @date 2020/6/12 9:46 上午
16  */
17 public class NioSelect {
18 
19     public static void main(String[] args) {
20         try {
21             ServerSocketChannel server = ServerSocketChannel.open();
22             server.configureBlocking(false);
23             server.bind(new InetSocketAddress(9999));
24             // 開啟selector
25             Selector selector = Selector.open();
26             // 將accept註冊到selector
27             server.register(selector,SelectionKey.OP_ACCEPT);
28             while (true){
29                 System.out.println( " key Sizes:" + selector.selectedKeys().size());
30                 while (selector.select(1000) > 0){
31                     Set<SelectionKey> keys = selector.selectedKeys();
32                     Iterator<SelectionKey> iterator = keys.iterator();
33                     while (iterator.hasNext()){
34                         SelectionKey next = iterator.next();
35                         // 移除防止重複遍歷
36                         iterator.remove();
37                         // 是否有新的連結進來
38                         if(next.isAcceptable()){
39                             ServerSocketChannel channel = (ServerSocketChannel) next.channel();
40                             SocketChannel cline = channel.accept();
41                             // 設定非阻塞
42                             cline.configureBlocking(false);
43                             System.out.println(cline.socket().getPort());
44                             ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
45                             // 將read事件註冊到selector中
46                             cline.register(selector,SelectionKey.OP_READ,buffer);
47                         }else if (next.isReadable()){
48                             SocketChannel client = (SocketChannel) next.channel();
49                             ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
50                             buffer.clear();
51                             int read;
52                             while (true) {
53                                 read = client.read(buffer);
54                                 if (read > 0) {
55                                     buffer.flip();
56                                     while (buffer.hasRemaining()) {
57                                         client.write(buffer);
58                                     }
59                                     buffer.clear();
60                                 } else if (read == 0) {
61                                     break;
62                                 } else {
63                                     client.close();
64                                     break;
65                                 }
66                             }
67                         }
68 
69                     }
70                 }
71             }
72 
73         } catch (Exception e) {
74             e.printStackTrace();
75         }
76     }
77 }

 

 

  4.1 select poll

  概念:NIO中解決多次向核心程式傳送無效指令問題

  流程: select(fd) --全量遍歷--> 使用者空間

  優點:減少了向核心程式傳送指令次數,一次將所有連結的fd傳送給核心,由核心遍歷

  缺點:每次都將全量的fd進行傳送,核心將進行全量遍歷,只有一個selector來來進行監控accept() IO操作

  select和poll的區別: 本質上是一致的, poll用連結串列儲存無連結限制. select 監聽fd有連結數量限制,LINUX32位預設 1024 64位預設 2048 

  4.2 epoll (even poll)

概念:會將有發生IO操作的fd通知到應用程式
優點:不再每次進行全量的遍歷 複雜度降低至O(1)可使用多個selector進行監控
流程: 建立連結後 epoll_create(256) -> 7 ----> epoll_crl(7,ADD,6(scoket建立時返回的檔案識別符號),accept) 將6的accept註冊到7空間(紅黑樹儲存fd)中 ----> epoll_wait() 阻塞的等待客戶端的連結 O(1) -accept(6)->8 與客戶端建立連結8 -> epoll_crl(7,ADD,8,read)將讀事件註冊到7空間 ----> epoll_wait(6,8)

&n