1. 程式人生 > >第一章 java nio三大組件與使用姿勢

第一章 java nio三大組件與使用姿勢

鏈路 循環 true tro 進程 案例 [] ase system

本案例來源於《netty權威指南》

一、三大組件

  • Selector:多路復用器。輪詢註冊在其上的Channel,當發現某個或者多個Channel處於“就緒狀態”後(accept接收連接事件、connect連接完成事件、read讀事件、write寫事件),從阻塞狀態返回就緒的Channel的SelectionKey集合,之後進行IO操作。
  • Channel:封裝了socket。
    • ServerSocketChannel:封裝了ServerSocket,用於accept客戶端連接請求;
    • SocketChannel:一對SocketChannel組成一條虛電路,進行讀寫通信
  • Buffer:用於存取數據,最主要的是ByteBuffer
    • position:下一個將被操作的字節位置
    • limit:在寫模式下表示可以進行寫的字節數,在讀模式下表示可以進行讀的字節數
    • capacity:Buffer的大小

二、服務端代碼

1、服務端啟動類

1 public class Server {
2     public static void main(String[] args) throws IOException {
3         new Thread(new ServerHandler(8080), "server-1").start();
4     }
5 }

創建一個任務ServerHandler,然後創建一條線程,啟動執行該任務。

2、邏輯處理類

 1
public class ServerHandler implements Runnable { 2 private Selector selector; 3 private ServerSocketChannel ssChannel; 4 5 public ServerHandler(int port) { 6 try { 7 //等價於 Selector selector = SelectorProvider.provider().openSelector(); 8 selector = Selector.open();
9 //等價於 SelectorProvider.provider().openServerSocketChannel() 10 ssChannel = ServerSocketChannel.open(); 11 ssChannel.configureBlocking(false); 12 ssChannel.bind(new InetSocketAddress(port), 1024); 13 ssChannel.register(selector, SelectionKey.OP_ACCEPT); 14 } catch (IOException e) { 15 e.printStackTrace(); 16 System.exit(1); 17 } 18 } 19 20 public void run() { 21 for (; ; ) { 22 try { 23 selector.select(); 24 Iterator<SelectionKey> it = selector.selectedKeys().iterator(); 25 while (it.hasNext()) { 26 SelectionKey key = it.next(); 27 it.remove(); 28 handleInput(key); 29 } 30 } catch (Throwable t) { 31 t.printStackTrace(); 32 } 33 } 34 35 } 36 37 private void handleInput(SelectionKey key) throws IOException { 38 if (key.isValid()) { 39 // 處理新接入的請求消息 40 if (key.isAcceptable()) { 41 // Accept the new connection 42 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); 43 SocketChannel sc = ssc.accept(); 44 sc.configureBlocking(false); 45 // Add the new connection to the selector 46 sc.register(selector, SelectionKey.OP_READ); 47 } 48 if (key.isReadable()) { 49 // Read the data 50 SocketChannel sc = (SocketChannel) key.channel(); 51 ByteBuffer readBuffer = ByteBuffer.allocate(1024); 52 int readBytes = sc.read(readBuffer); 53 if (readBytes > 0) { 54 readBuffer.flip(); 55 byte[] bytes = new byte[readBuffer.remaining()]; 56 readBuffer.get(bytes); 57 String body = new String(bytes, "UTF-8"); 58 System.out.println("The time server receive order : " 59 + body); 60 String currentTime = "QUERY TIME ORDER" 61 .equalsIgnoreCase(body) ? new java.util.Date( 62 System.currentTimeMillis()).toString() 63 : "BAD ORDER"; 64 doWrite(sc, currentTime); 65 } else if (readBytes < 0) { 66 // 對端鏈路關閉 67 key.cancel(); 68 sc.close(); 69 } else 70 ; // 讀到0字節,忽略 71 } 72 } 73 } 74 75 private void doWrite(SocketChannel channel, String response) 76 throws IOException { 77 if (response != null && response.trim().length() > 0) { 78 byte[] bytes = response.getBytes(); 79 ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); 80 writeBuffer.put(bytes); 81 writeBuffer.flip(); 82 channel.write(writeBuffer); 83 } 84 } 85 }

步驟:

1、創建一個Selector和ServerSocketChannel實例

2、配置ServerSocketChannel實例為非阻塞

3、ServerSocketChannel實例bind端口

4、將ServerSocketChannel實例註冊到selector上,監聽OP_ACCEPT事件

下面的任務在Server創建的新的線程中執行,不影響主線程執行其他邏輯

5、之後進入死循環

5.1、使用select.select()阻塞等待就緒事件(這裏是等待OP_ACCEPT事件),一旦有有就緒事件到達,立即向下執行

5.2、使用selector.selectedKeys()獲取已經就緒的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之後循環遍歷

5.3、從叠代器刪除該SelectionKey,防止下一次再被遍歷到

5.4、如果SelectionKey==OP_ACCEPT,則通過ServerSocketChannel.accept()創建SocketChannel,該SocketChannel是後續真正的與客戶端的SocketChannel進行通信的實體

5.5、配置新創建的SocketChannel實例為非阻塞,然後將該SocketChannel實例註冊到selector實例上,監聽OP_READ事件

5.6、等客戶端發出請求數據時,此處監聽到SelectionKey==OP_READ,則創建ByteBuffer實例,將SocketChannel中的數據讀取到ByteBuffer中,然後再創建ByteBuffer將信息寫回到SocketChannel(也就是說數據的讀寫一定要通過Buffer)

三、客戶端代碼

1、客戶端啟動類

1 public class Client {
2     public static void main(String[] args) {
3         new Thread(new ClientHandler("127.0.0.1", 8080), "client-1").start();
4     }
5 }

創建一個任務ClientHandler,然後創建一條線程,啟動執行該任務。

2、邏輯處理類

 1 public class ClientHandler implements Runnable {
 2     private String host;
 3     private int port;
 4 
 5     private Selector selector;
 6     private SocketChannel socketChannel;
 7 
 8     public ClientHandler(String host, int port) {
 9         this.host = host;
10         this.port = port;
11         try {
12             selector = Selector.open();
13             socketChannel = SocketChannel.open();
14             socketChannel.configureBlocking(false);
15         } catch (IOException e) {
16             e.printStackTrace();
17             System.exit(1);
18         }
19     }
20 
21     public void run() {
22         try {
23             doConnect();
24         } catch (IOException e) {
25             e.printStackTrace();
26             System.exit(1);
27         }
28         while (true) {
29             try {
30                 selector.select();
31                 Iterator<SelectionKey> it = selector.selectedKeys().iterator();
32                 while (it.hasNext()) {
33                     SelectionKey key = it.next();
34                     it.remove();
35                     handleInput(key);
36                 }
37             } catch (Exception e) {
38                 e.printStackTrace();
39                 System.exit(1);
40             }
41         }
42     }
43 
44     private void handleInput(SelectionKey key) throws IOException {
45         if (key.isValid()) {
46             // 判斷是否連接成功
47             SocketChannel sc = (SocketChannel) key.channel();
48             if (key.isConnectable()) {
49                 if (sc.finishConnect()) {
50                     sc.register(selector, SelectionKey.OP_READ);
51                     doWrite(sc);
52                 } else
53                     System.exit(1);// 連接失敗,進程退出
54             } else if (key.isReadable()) {
55                 ByteBuffer readBuffer = ByteBuffer.allocate(1024);
56                 int readBytes = sc.read(readBuffer);
57                 if (readBytes > 0) {
58                     readBuffer.flip();
59                     byte[] bytes = new byte[readBuffer.remaining()];
60                     readBuffer.get(bytes);
61                     String body = new String(bytes, "UTF-8");
62                     System.out.println("Now is : " + body);
63                 } else if (readBytes < 0) {
64                     // 對端鏈路關閉
65                     key.cancel();
66                     sc.close();
67                 } else
68                     ; // 讀到0字節,忽略
69             }
70         }
71 
72     }
73 
74     private void doConnect() throws IOException {
75         // 如果直接連接成功,則註冊到多路復用器上,發送請求消息,讀應答
76         if (socketChannel.connect(new InetSocketAddress(host, port))) {
77             socketChannel.register(selector, SelectionKey.OP_READ);
78             doWrite(socketChannel);
79         } else
80             socketChannel.register(selector, SelectionKey.OP_CONNECT);
81     }
82 
83     private void doWrite(SocketChannel sc) throws IOException {
84         byte[] req = "QUERY TIME ORDER".getBytes();
85         ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
86         writeBuffer.put(req);
87         writeBuffer.flip();
88         sc.write(writeBuffer);
89         if (!writeBuffer.hasRemaining())
90             System.out.println("Send order 2 server succeed.");
91     }
92 }

步驟:

1、創建一個Selector和SocketChannel實例

2、配置SocketChannel實例為非阻塞

下面的任務在Cilent創建的新的線程中執行,不影響主線程執行其他邏輯

3、SocketChannel.connect連接到server端,如果連接沒有馬上成功,將該SocketChannel實例註冊到selector上,監聽OP_CONNECT事件;如果連接成功,將該SocketChannel實例註冊到selector上,監聽OP_READ事件,之後寫數據給server端

4、之後進入死循環

4.1、使用select.select()阻塞等待就緒事件,一旦有有就緒事件到達,立即向下執行

4.2、使用selector.selectedKeys()獲取已經就緒的SelectionKey(即OP_ACCEPT/OP_CONECT/OP_READ/OP_WRITE)集合,之後循環遍歷

4.3、從叠代器刪除該SelectionKey,防止下一次再被遍歷到

4.4、如果SelectionKey==OP_CONNECT,將該SocketChannel實例註冊到selector上,監聽OP_READ事件,之後寫數據給server端;如果監聽到SelectionKey==OP_READ,則創建ByteBuffer實例,將SocketChannel中的數據讀取到ByteBuffer中

第一章 java nio三大組件與使用姿勢