1. 程式人生 > >Reactor模型-單執行緒版

Reactor模型-單執行緒版

Reactor模型是典型的事件驅動模型。在網路程式設計中,所謂的事件當然就是read、write、bind、connect、close等這些動作了。Reactor模型的實現有很多種,下面介紹最基本的三種:

  • 單執行緒版
  • 多執行緒版
  • 主從多執行緒版
Key Word:Java NIO,Reactor模型,Java併發程式設計,Event-Driven

單執行緒版本

結構圖(引用自Doug Lea的Scalable IO in Java)如下:

  Reactor模型圖
上圖中Reactor是一個典型的事件驅動中心,客戶端發起請求並建立連線時,會觸發註冊在多路複用器Selector上的SelectionKey.OP_ACCEPT事件,繫結在該事件上的Acceptor物件的職責就是接受請求,為接下來的讀寫操作做準備。

 

Reactor設計如下:

/**
 * Reactor
 * 
 * @author wqx
 *
 */
public class Reactor implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Reactor.class); private Selector selector; private ServerSocketChannel ssc; private Handler DEFAULT_HANDLER = new Handler(){ @Override public void processRequest(Processor processor, ByteBuffer msg) { //NOOP } }; private Handler handler = DEFAULT_HANDLER; /** * 啟動階段 * @param port * @throws IOException */ public Reactor(int port, int maxClients, Handler serverHandler) throws IOException{ selector = Selector.open(); ssc = ServerSocketChannel.open(); ssc.configureBlocking(false); ssc.socket().bind(new InetSocketAddress(port)); this.handler = serverHandler; SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); sk.attach(new Acceptor()); } /** * 輪詢階段 */ @Override public void run() { while(!ssc.socket().isClosed()){ try { selector.select(1000); Set<SelectionKey> keys; synchronized(this){ keys = selector.selectedKeys(); } Iterator<SelectionKey> it = keys.iterator(); while(it.hasNext()){ SelectionKey key = it.next(); dispatch(key); it.remove(); } } catch (IOException e) { e.printStackTrace(); } } close(); } public void dispatch(SelectionKey key){ Runnable r = (Runnable)key.attachment(); if(r != null) r.run(); } /** * 用於接受TCP連線的Acceptor * */ class Acceptor implements Runnable{ @Override public void run() { SocketChannel sc; try { sc = ssc.accept(); if(sc != null){ new Processor(Reactor.this,selector,sc); } } catch (IOException e) { e.printStackTrace(); } } } public void close(){ try { selector.close(); if(LOG.isDebugEnabled()){ LOG.debug("Close selector"); } } catch (IOException e) { LOG.warn("Ignoring exception during close selector, e=" + e); } } public void processRequest(Processor processor, ByteBuffer msg){ if(handler != DEFAULT_HANDLER){ handler.processRequest(processor, msg); } } } 

上面是典型的單執行緒版本的Reactor實現,例項化Reactor物件的過程中,在當前多路複用器Selector上註冊了OP_ACCEPT事件,當OP_ACCEPT事件發生後,Reactor通過dispatch方法執行Acceptor的run方法,Acceptor類的主要功能就是接受請求,建立連線,並將代表連線建立的SocketChannel以引數的形式構造Processor物件。

Processor的任務就是進行I/O操作。

下面是Processor的原始碼:

/**
 * Server Processor
 * 
 * @author wqx
 */
public class Processor implements Runnable { private static final Logger LOG = LoggerFactory.getLogger(Processor.class); Reactor reactor; private SocketChannel sc; private final SelectionKey sk; private final ByteBuffer lenBuffer = ByteBuffer.allocate(4); private ByteBuffer inputBuffer = lenBuffer; private ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64); private LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>(); public Processor(Reactor reactor, Selector sel,SocketChannel channel) throws IOException{ this.reactor = reactor; sc = channel; sc.configureBlocking(false); sk = sc.register(sel, SelectionKey.OP_READ); sk.attach(this); sel.wakeup(); } @Override public void run() { if(sc.isOpen() && sk.isValid()){ if(sk.isReadable()){ doRead(); }else if(sk.isWritable()){ doSend(); } }else{ LOG.error("try to do read/write operation on null socket"); try { if(sc != null) sc.close(); } catch (IOException e) {} } } private void doRead(){ try { int byteSize = sc.read(inputBuffer); if(byteSize < 0){ LOG.error("Unable to read additional data"); } if(!inputBuffer.hasRemaining()){ if(inputBuffer == lenBuffer){ //read length inputBuffer.flip(); int len = inputBuffer.getInt(); if(len < 0){ throw new IllegalArgumentException("Illegal data length"); } //prepare for receiving data inputBuffer = ByteBuffer.allocate(len); }else{ //read data if(inputBuffer.hasRemaining()){ sc.read(inputBuffer); } if(!inputBuffer.hasRemaining()){ inputBuffer.flip(); processRequest(); //clear lenBuffer and waiting for next reading operation lenBuffer.clear(); inputBuffer = lenBuffer; } } } } catch (IOException e) { LOG.error("Unexcepted Exception during read. e=" + e); try { if(sc != null) sc.close(); } catch (IOException e1) { LOG.warn("Ignoring exception when close socketChannel"); } } } /** * process request and get response * * @param request * @return */ private void processRequest(){ reactor.processRequest(this,inputBuffer); } private void doSend(){ try{ /** * write data to channel: * step 1: write the length of data(occupy 4 byte) * step 2: data content */ if(outputQueue.size() > 0){ ByteBuffer directBuffer = outputDirectBuffer; directBuffer.clear(); for(ByteBuffer buf : outputQueue){ buf.flip(); if(buf.remaining() > directBuffer.remaining()){ //prevent BufferOverflowException buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining()); } //transfers the bytes remaining in buf into directBuffer int p = buf.position(); directBuffer.put(buf); //reset position buf.position(p); if(!directBuffer.hasRemaining()){ break; } } directBuffer.flip(); int sendSize = sc.write(directBuffer); while(!outputQueue.isEmpty()){ ByteBuffer buf = outputQueue.peek(); int left = buf.remaining() - sendSize; if(left > 0){ buf.position(buf.position() + sendSize); break; } sendSize -= buf.remaining(); outputQueue.remove(); } } synchronized(reactor){ if(outputQueue.size() == 0){ //disable write disableWrite(); }else{ //enable write enableWrite(); } } } catch (CancelledKeyException e) { LOG.warn("CancelledKeyException occur e=" + e); } catch (IOException e) { LOG.warn("Exception causing close, due to " + e); } } public void sendBuffer(ByteBuffer bb){ try{ synchronized(this.reactor){ if(LOG.isDebugEnabled()){ LOG.debug("add sendable bytebuffer into outputQueue"); } //wrap ByteBuffer with length header ByteBuffer wrapped = wrap(bb); outputQueue.add(wrapped); enableWrite(); } }catch(Exception e){ LOG.error("Unexcepted Exception: ", e); } } private ByteBuffer wrap(ByteBuffer bb){ bb.flip(); lenBuffer.clear(); int len = bb.remaining(); lenBuffer.putInt(len); ByteBuffer resp = ByteBuffer.allocate(len+4); lenBuffer.flip(); resp.put(lenBuffer); resp.put(bb); return resp; } private void enableWrite(){ int i = sk.interestOps(); if((i & SelectionKey.OP_WRITE) == 0){ sk.interestOps(i | SelectionKey.OP_WRITE); } } private void disableWrite(){ int i = sk.interestOps(); if((i & SelectionKey.OP_WRITE) == 1){ sk.interestOps(i & (~SelectionKey.OP_WRITE)); } } } 

其實Processor要做的事情很簡單,就是向selector註冊感興趣的讀寫時間,OP_READ或OP_WRITE,然後等待事件觸發,做相應的操作。

    @Override
    public void run() { if(sc.isOpen() && sk.isValid()){ if(sk.isReadable()){ doRead(); }else if(sk.isWritable()){ doSend(); } }else{ LOG.error("try to do read/write operation on null socket"); try { if(sc != null) sc.close(); } catch (IOException e) {} } } 

而doRead()和doSend()方法稍微複雜了一點,這裡其實處理了用TCP協議進行通訊時必須要解決的問題:TCP粘包拆包問題

TCP粘包拆包問題

我們都知道TCP協議是面向位元組流的,而位元組流是連續的,無法有效識別應用層資料的邊界。如下圖:


  粘包拆包示意圖

上圖顯示的應用層有三個資料包,D1,D2,D3.當應用層資料傳到傳輸層後,可能會出現粘包拆包現象。

TCP協議的基本傳輸單位是報文段,而每個報文段最大有效載荷是有限制的,一般乙太網MTU為1500,去除IP頭20B,TCP頭20B,那麼剩下的1460B就是傳輸層最大報文段的有效載荷。如果應用層資料大於該值(如上圖中的資料塊D2),那麼傳輸層就會進行拆分重組。

解決方案

  1. 訊息定長(通訊雙方傳送的訊息固定長度,缺點很明顯:浪費可恥!!!)
  2. 每個訊息之間加分割符(缺點:訊息編解碼耗時,並且如果訊息體中本省就包含分隔字元,需要進行轉義,效率低)
  3. 每個資料包加個Header!!!(header中指定後面資料的長度,這不就是Tcp、Ip協議通用的做法麼。。。哈哈)

採用方案三

示意圖如下:


  資料包結構

header區佔用4B,內容為資料的長度。too simple。。。-_-

理論有了,下面具體分析下Read、Write的實現過程:

doRead
inputBuffer負責接受資料,lenBuffer負責接受資料長度,初始化的時候,將lenBuffer賦給inputBuffer,定義如下:

private final ByteBuffer lenBuffer = ByteBuffer.allocate(4);
private ByteBuffer inputBuffer = lenBuffer;
  1. 如果inputBuffer == lenBuffer,那麼從inputBuffer中讀取出一個整型值len,這個值就是接下來要接受的資料的長度。同時分配一個大小為len的記憶體空間,並複製給inputBuffer,準備接受資料!!!
    private void doRead(){ try { int byteSize = sc.read(inputBuffer); if(byteSize < 0){ LOG.error("Unable to read additional data"); } if(!inputBuffer.hasRemaining()){ if(inputBuffer == lenBuffer){ //read length inputBuffer.flip(); int len = inputBuffer.getInt(); if(len < 0){ throw new IllegalArgumentException("Illegal data length"); } //prepare for receiving data inputBuffer = ByteBuffer.allocate(len); else{...} 
  1. 如果inputBuffer != lenBuffer,那麼開始接受資料吧!
if(inputBuffer == lenBuffer){
        //。。。
}else{
    //read data
    if(inputBuffer.hasRemaining()){ sc.read(inputBuffer); } if(!inputBuffer.hasRemaining()){ inputBuffer.flip(); processRequest(); //clear lenBuffer and waiting for next reading operation lenBuffer.clear(); inputBuffer = lenBuffer; } } 

注意

  1. 必須保證緩衝區是滿的,即inputBuffer.hasRemaining()=false
  2. processRequest後,將inputBuffer重新賦值為lenBuffer,為下一次讀操作做準備。

doWrite

使用者呼叫sendBuffer方法傳送資料,其實就是將資料加入outputQueue,這個outputQueue就是一個傳送緩衝佇列。

public void sendBuffer(ByteBuffer bb){ try{ synchronized(this.reactor){ if(LOG.isDebugEnabled()){ LOG.debug("add sendable bytebuffer into outputQueue"); } //wrap ByteBuffer with length header ByteBuffer wrapped = wrap(bb); outputQueue.add(wrapped); enableWrite(); } }catch(Exception e){ LOG.error("Unexcepted Exception: ", e); } } 

doSend方法就很好理解了,無非就是不斷從outputQueue中取資料,然後寫入channel中即可。過程如下:

將傳送佇列outputQueue中的資料寫入緩衝區outputDirectBuffer:

  1. 清空outputDirectBuffer,為傳送資料做準備
  2. 將outputQueue資料寫入outputDirectBuffer
  3. 呼叫socketChannel.write(outputDirectBuffer);將outputDirectBuffer寫入socket緩衝區

執行步驟2的時候,我們可能會遇到這麼幾種情況:

1.某個資料包大小超過了outputDirectBuffer剩餘空間大小

2.outputDirectBuffer已被填滿,但是outputQueue仍有待發送的資料

執行步驟3的時候,也可能出現下面兩種情況:

1.outputDirectBuffer被全部寫入socket緩衝區

2.outputDirectBuffer只有部分資料或者壓根就沒有資料被寫入socket緩衝區

實現過程可以結合原始碼,這裡重點分析下面幾個點:

為什麼需要重置buf的position

int p = buf.position();
directBuffer.put(buf);
//reset position
buf.position(p);

寫入directBuffer的資料是即將被寫入SocketChannel的資料,問題就在於:當我們呼叫

int sendSize = sc.write(directBuffer);

的時候,directBuffer中的資料都被寫入Channel了嗎?明顯是不確定的(具體可以看java.nio.channels.SocketChannel.write(ByteBuffer src)的doc文件)

上面的問題如何解決

思路很簡單,根據write方法返回值sendSize,遍歷outputQueue中的ByteBuffer,根據buf.remaining()和sendSize的大小,才可以確定buf是否真的被髮送了。如下所示:

while(!outputQueue.isEmpty()){
    ByteBuffer buf = outputQueue.peek();
    int left = buf.remaining() - sendSize;
    if(left > 0){
        buf.position(buf.position() + sendSize);
        break; } sendSize -= buf.remaining(); outputQueue.remove(); } 

網路通訊基本解決,上面的處理思路是參照Zookeeper網路模組的實現,有興趣可以看Zookeeper相應原始碼。

測試

Server端:

public class ServerTest {

    private static int PORT = 8888; public static void main(String[] args) throws IOException, InterruptedException { Thread t = new Thread(new Reactor(PORT,1024,new MyHandler())); t.start(); System.out.println("server start"); t.join(); } } 

使用者自定義Handler:

public class MyHandler implements Handler { @Override public void processRequest(Processor processor, ByteBuffer msg) { byte[] con = new byte[msg.remaining()]; msg.get(con); String str = new String(con,0,con.length); String resp = ""; switch(str){ case "request1":resp = "response1";break; case "request2":resp = "response2";break; case "request3":resp = "response3";break; default :resp = ""; } ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length); buf.put(resp.getBytes()); processor.sendBuffer(buf); } } 

client端

public class ClientTest {

    private static String HOST = "localhost"; private static int PORT = 8888; public static void main(String[] args) throws IOException { Client client = new Client(); client.socket().setTcpNoDelay(true); client.connect( new InetSocketAddress(HOST,PORT)); ByteBuffer msg; for(int i = 1; i <= 3; i++){ msg = ByteBuffer.wrap(("request" + i).getBytes()); System.out.println("send-" + "request" + i); ByteBuffer resp = client.send(msg); byte[] retVal = new byte[resp.remaining()]; resp.get(retVal); System.out.println("receive-" + new String(retVal,0,retVal.length)); } } } 

輸出:

send-request1
receive-response1
send-request2
receive-response2
send-request3
receive-response3

Client是一個客戶端工具類,簡單封裝了傳送ByteBuffer前,新增header的邏輯。詳見原始碼。Client.java

總結

在這種實現方式中,dispatch方法是同步阻塞的!!!所有的IO操作和業務邏輯處理都在NIO執行緒(即Reactor執行緒)中完成。如果業務處理很快,那麼這種實現方式沒什麼問題,不用切換到使用者執行緒。但是,想象一下如果業務處理很耗時(涉及很多資料庫操作、磁碟操作等),那麼這種情況下Reactor將被阻塞,這肯定是我們不希望看到的。解決方法很簡單,業務邏輯進行非同步處理,即交給使用者執行緒處理。

下面分析下單執行緒版的Reactor模型的缺點:

  • 自始自終都只有一個Reactor執行緒,缺點很明顯:Reactor意外掛了,整個系統也就無法正常工作,可靠性太差。
  • 單執行緒的另外一個問題是在大負載的情況下,Reactor的處理速度必然會成為系統性能的瓶頸。


作者:TopGun_Viper
連結:https://www.jianshu.com/p/eb28811421e3
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。