作者:Grey

原文地址:Java IO學習筆記七:多路複用從單執行緒到多執行緒

前面提到的多路複用的服務端程式碼中, 我們在處理讀資料的同時,也處理了寫事件:

    public void readHandler(SelectionKey key) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
buffer.flip();
while (buffer.hasRemaining()) {
client.write(buffer);
}
buffer.clear();
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

為了權責清晰一些,我們分開了兩個事件處理:

一個負責寫,一個負責讀

讀的事件處理, 如下程式碼

    public void readHandler(SelectionKey key) {
System.out.println("read handler.....");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.clear();
int read = 0;
try {
while (true) {
read = client.read(buffer);
if (read > 0) {
client.register(key.selector(), SelectionKey.OP_WRITE, buffer);
} else if (read == 0) {
break;
} else {
client.close();
break;
}
}
} catch (IOException e) {
e.printStackTrace();
}
}

其中read > 0 即從客戶端讀取到了資料,我們才註冊一個寫事件:

client.register(key.selector(), SelectionKey.OP_WRITE, buffer);

其他事件不註冊寫事件。(PS:只要send-queue沒有滿,就可以註冊寫事件)

寫事件的處理邏輯如下:

    private void writeHandler(SelectionKey key) {
System.out.println("write handler...");
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer buffer = (ByteBuffer) key.attachment();
buffer.flip();
while (buffer.hasRemaining()) {
try {
client.write(buffer);
} catch (IOException e) {
e.printStackTrace();
}
}
buffer.clear();
key.cancel();
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

寫完後,呼叫key.cancel() 取消註冊,並關閉客戶端。既然分了讀和寫的不同處理流程,那麼在主方法裡面呼叫的時候:

                    while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
} else if (key.isWritable()) {
writeHandler(key);
}
}

增加了

if (key.isWritable()) {
writeHandler(key);
}

測試一下,執行SocketMultiplexingV2.java

並通過一個客戶端連線進來:

nc 192.168.205.1 9090

客戶端傳送一些內容:

nc 192.168.205.1 9090
asdfasdfasf
asdfasdfasf

可以正常接收到資料。

考慮有一個fd執行耗時,在一個線性裡會阻塞後續FD的處理,同時,考慮資源利用,充分利用cpu核數。

我們來實現一個基於多執行緒的多路複用模型。

將N個FD分組(這裡的FD就是Socket連線),每一組一個selector,將一個selector壓到一個執行緒上(最好的執行緒數量是: cpu核數或者cpu核數*2)

每個selector中的fd是線性執行的。假設有100w個連線,如果有四個執行緒,那麼每個執行緒處理25w個。

分組的FD和處理這堆FD的Selector我們封裝到一個數據結構中,假設叫:SelectorThread,其成員變數至少有如下:

public class SelectorThread  {
...
Selector selector = null;
// 存Selector對應要處理的FD佇列
LinkedBlockingQueue<Channel> lbq = new LinkedBlockingQueue<>();
...
}

由於其處理是線性的,且我們要開很多個執行緒來處理,所以SelectorThread本身是一個執行緒類(實現Runnable介面)

public class SelectorThread implements Runnable {
... }

在run方法中,我們就可以把之前單執行緒處理selector的常規操作程式碼移植過來:

....
while (true) {
....
if (selector.select() > 0) {
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
acceptHandler(key);
} else if (key.isReadable()) {
readHandler(key);
} else if (key.isWritable()) {
}
}
}
....
}
....

SelectorThread設計好以後,我們需要一個可以組織SelectorThread的類,假設叫SelectorThreadGroup,這個類的主要職責就是安排哪些FD由哪些Selector來接管,這個類裡面持有兩個SelectorThread陣列,一個用於分配服務端,一個用於分配每次客戶端的Socket請求。

// 服務端,可以啟動多個服務端
SelectorThread[] bosses;
// 客戶端的Socket請求
SelectorThread[] workers;

構造器中初始化這兩個陣列

    SelectorThreadGroup(int bossNum, int workerNum) {
bosses = new SelectorThread[bossNum];
workers = new SelectorThread[workerNum];
for (int i = 0; i < bossNum; i++) {
bosses[i] = new SelectorThread(this);
new Thread(bosses[i]).start();
}
for (int i = 0; i < workerNum; i++) {
workers[i] = new SelectorThread(this);
new Thread(workers[i]).start();
}
}

以下程式碼是針對每次的請求,如何分配Selector:

...
public void nextSelector(Channel c) {
try {
SelectorThread st;
if (c instanceof ServerSocketChannel) {
st = nextBoss();
st.lbq.put(c);
st.setWorker(workerGroup);
} else {
st = nextWork();
st.lbq.add(c);
}
st.selector.wakeup();
} catch (InterruptedException e) {
e.printStackTrace();
}
} private SelectorThread nextBoss() {
int index = xid.incrementAndGet() % bosses.length;
return bosses[index];
} private SelectorThread nextWork() {
int index = xid.incrementAndGet() % workers.length; //動用worker的執行緒分配
return workers[index];
} ...

這裡要區分兩類Channel,一類是ServerSocketChannel,即我們每次啟動的服務端,另外一類就是連線服務端的Socket請求,這兩類最好是分到不同的SelectorThread中的佇列中去。分配的演算法是樸素的輪詢演算法(除以陣列長度取模)

這樣我們主函式只需要和SelectorThreadGroup互動即可:


public class Startup { public static void main(String[] args) {
// 開闢了三個SelectorThread給服務端,開闢了三個SelectorThread給客戶端去接收Socket
SelectorThreadGroup group = new SelectorThreadGroup(3,3);
group.bind(9999);
group.bind(8888);
group.bind(6666);
group.bind(7777);
}
}

啟動Startup,

開啟一個客戶端,請求服務端,測試一下:

[root@io io]# nc 192.168.205.1 7777
sdfasdfs
sdfasdfs

客戶端請求的資料可以返回,服務端可以監聽到客戶端的請求:

Thread-1 register listen
Thread-0 register listen
Thread-2 register listen
Thread-1 register listen
Thread-1 acceptHandler......
Thread-5 register client: /192.168.205.138:44152

因為我們開了四個埠的監聽,但是我們只設置了三個服務端SelectorThread,所以可以看到Thread-1監聽了兩個服務端。

新接入的客戶端連線是從Thread-5開始的,不會和前面的Thread-0,Thread-1,Thread-2衝突。

再次來一個新的客戶端連線

[root@io io]# nc 192.168.205.1 8888
sdfasdfas
sdfasdfas

輸入一些內容,依然可以得到服務端的響應

服務端這邊日誌顯示:

Thread-3 register client: /192.168.205.138:33262
Thread-3 read......

顯示是Thread-3捕獲了新的連線,也不會和前面的Thread-0,Thread-1,Thread-2衝突。

完整原始碼:Github