1. 程式人生 > >JavaNIO--4.多執行緒Reactor模式

JavaNIO--4.多執行緒Reactor模式

單執行緒VS多執行緒

寫在前面:
這裡寫圖片描述
這裡寫圖片描述

也就是說多執行緒實現echo伺服器實際上是不太科學的選擇,但是本文依舊是實現了一個echo伺服器。為了不誤人子弟,所以請謹慎觀看第二部分——自己實現的多執行緒echo伺服器。

這裡寫圖片描述

本質上這是一個同步的多路複用的I/O模式,其優點是在I/O階段沒有阻塞,而在選擇器呼叫select()方法時會阻塞,並且在資料處理的階段,因為其是同步設計,所以會佔用CPU直到資料處理結束後才進入下一次選擇分發。

我們知道,實際上系統I/O是進行了兩個階段,可以參考我寫的JavaI/O模型,而非阻塞式I/O只是第一個階段——資料是否在核心空間準備完畢,是非阻塞的,而I/O的第二個階段——從核心空間複製到使用者空間(NIO中就是從Channel

讀取到Buffer),依舊是會佔用CPU的,從某種意義上來講,這個過程是與Reactor本身功能無關的,所以我們就認為這個過程資料處理的過程是可以通過其他執行緒完成的。
因此推出了一種新的模型,多線Reactor模型。

這裡寫圖片描述

這是Doug Lea大神提出的一種模型,可以看出他的設計思路是,I/O過程依舊是在主執行緒中的,而伺服器實現功能能的過程(decode,computeencode)使用了多執行緒。這是一種很經典的伺服器實現思路,因為資料處理的過程是耗時比較多的,而使用了非阻塞式I/O的讀寫過程耗時很少,所以資料處理過程交給了其他執行緒。

1.模型程式碼

1.1Reactor模型

Reactor.java

這段是Doug Lea設計模型中的Reactor(反應器)

public class Reactor implements Runnable{

    // final變數一定要在構造器中初始化
    // 若為static final則一定要直接初始化或者在static程式碼塊中初始化
    final Selector selector;
    final ServerSocketChannel serverSocket;

    public Reactor(int port) throws IOException {
        // TODO Auto-generated constructor stub
selector = Selector.open(); serverSocket = ServerSocketChannel.open(); serverSocket.socket().bind(new InetSocketAddress(port)); serverSocket.configureBlocking(false); SelectionKey sKey = serverSocket.register(selector, SelectionKey.OP_ACCEPT); sKey.attach(new Acceptor()); } /** * DispatchLoop * 派發迴圈,迴圈呼叫dispatch()方法 */ @Override public void run() { // TODO Auto-generated method stub try { while(!Thread.interrupted()) { selector.select(); Set<SelectionKey> selected = selector.selectedKeys(); Iterator<SelectionKey> iterator = selected.iterator(); while(iterator.hasNext()) { dispatch(iterator.next()); } // 清空selector的興趣集合,和使用迭代器的remove()方法一樣 selected.clear(); } } catch (Exception e) { // TODO: handle exception } } /** * 派發任務,相當於判斷條件以後再呼叫指定方法 * 使用dispatch()可以無差別的直接派發任務到指定物件並且呼叫指定方法 * 例如:Accept的接收方法,Handler的處理報文的方法 * @param key */ private void dispatch(SelectionKey key) { System.out.println("釋出了一個新任務"); Runnable r = (Runnable)(key.attachment()); if (r != null) { r.run(); } } class Acceptor implements Runnable{ @Override public void run() { // TODO Auto-generated method stub try { SocketChannel socketChannel = serverSocket.accept(); if (socketChannel != null) { /** * 每次new一個Handler相當於先註冊了一個key到selector * 而真正進行讀寫操作傳送操作還是依靠DispatchLoop實現 */ new Handler(selector, socketChannel); } } catch (Exception e) { // TODO: handle exception } } } }

可以看到這段程式碼和我們之前熟悉的NIO操作一樣,由經典的DispatchLoop進行輪詢通道狀態並派發任務,由Acceptor 接收請求並建立Handler物件。

1.Handler模型

HandlerWithThreadPool.java

這是實際的資料處理類。

public class HandlerWithThreadPool implements Runnable{

    final SocketChannel socket;
    final Selector selector;
    final SelectionKey key;
    final ByteBuffer inputBuffer = ByteBuffer.allocate(1024);
    final ByteBuffer outputBuffer = ByteBuffer.allocate(1024);

    // 初始化一個執行緒池
    static ThreadPoolExecutor pool = new ThreadPoolExecutor(10, 20, 100, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());

    // 狀態碼,分別對應讀狀態,寫狀態和處理狀態
    static final int READING = 1;
    static final int SENDING = 2;
    static final int PROCESSING = 3;
    // 初始的狀態碼是READING狀態,因為Reactor分發任務時新建的Handler肯定是讀就緒狀態
    private int state = READING;        

    public HandlerWithThreadPool(SocketChannel socket,Selector selector) throws IOException {
        this.socket = socket;
        this.selector = selector;

        key = socket.register(selector, 0);
        socket.configureBlocking(false);
        key.interestOps(SelectionKey.OP_READ);
        // attach(this)是為了dispatch()呼叫
        key.attach(this);
    }

    /** 判斷讀寫資料時候完成的方法 **/
    private boolean inputIsCompelete() {return true;}
    private boolean outputIsCompelete() {return true;}

    /** 對資料的處理類,比如HTTP伺服器就會返回HTTP報文 **/
    private void process() {
        // 自己實現的伺服器功能
    }

    /**
     * 讀入資料,確定通道內資料讀完以後
     * 狀態碼要變為 PROCESSING
     * 需要特別注意的是,本方法是在Reactor執行緒中執行的
     * 
     * @throws IOException
     */
    void read() throws IOException {
        socket.read(inputBuffer);
        if (inputIsCompelete()) {
            state = PROCESSING;
            pool.execute(new Processer());
        }
    }

    /**
     * 這個方法呼叫了process()方法
     * 而後修改了狀態碼和興趣操作集
     * 注意本方法是同步的,因為多執行緒實際執行的是這個方法
     * 如果不是同步方法,有可能出現
     */
    synchronized void processAndHandOff() {
        process();
        state = SENDING;
        key.interestOps(SelectionKey.OP_WRITE);
    }

    /**
     * 這個內部類完全是為了使用執行緒池
     * 這樣就可以實現資料的讀寫在主執行緒內
     * 而對資料的處理在其他執行緒中完成
     */
    class Processer implements Runnable {
        public void run() {
            processAndHandOff();
        }
    }

    @Override
    public void run() {
        if (state == READING) {
            try {
                read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }else if (state == SENDING) {
            // 完成資料的傳送即可
        }
    }

}

以上程式碼就是模型中的Handler,我們來分析一下程式碼。

  1. HandlerWithThreadPool 內定義了一個靜態的執行緒池,而靜態的執行緒池其實存在於方法區中,也就是說並不是每一個物件都有一個方執行緒池,而是整個程序中只有一個執行緒池。
  2. HandlerWithThreadPoolrun()方法並不是為了多執行緒使用,而是為了Reactor中的dispatch()方法呼叫。
  3. 真正多執行緒執行的方法是Processer 內部類中的run()方法,這樣就能體現出對通道資料的讀寫I/O過程是在主執行緒中(Reactor執行緒),而對於資料的處理(解碼編碼,伺服器功能實現),是線上程池的執行緒中。

2.自己實現的多執行緒echo伺服器(基於Reactor模式)

2.1 Reactor

public class NioReactor2 {

    private Selector selector;
    private ServerSocketChannel serverSocketChannel;

    public NioReactor2(int port) throws IOException {
        // 初始化Selector和Channel,並完成註冊
        selector = Selector.open();
        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        SelectionKey selectionKey = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
        selectionKey.attach(new Acceptor());
        serverSocketChannel.bind(new InetSocketAddress(port));
    }

    /**
     * 輪詢分發任務
     * @throws IOException
     */
    private void dispatchLoop() throws IOException {
        while(true) {
            selector.select();
            Set<SelectionKey> selectionKeys = selector.selectedKeys();
            Iterator<SelectionKey> iterator = selectionKeys.iterator();

            while(iterator.hasNext()) {
                SelectionKey selectionKey = iterator.next();
                dispatchTask(selectionKey);
            }
            selectionKeys.clear();
        }
    }

    /**
     * 任務分派器的進階版,耦合性降低,拓展性增強
     * 子類只需要實現Runnable介面,並重寫run()方法,就可以實現多種任務的無差別分派
     * 
     * @param selectionKey
     */
    private void dispatchTask(SelectionKey taskSelectionKey) {
        Runnable runnable = (Runnable)taskSelectionKey.attachment();
        if (runnable != null) {
            runnable.run();
        }
    }

    /**
     * Accept類,實際TCP連線的建立和SocketChannel的獲取在這個類中實現
     * 根據類的實現,可以發現一個Accept類對應一個ServerSocketChannel
     * 
     * @author CringKong
     *
     */
    private class Acceptor implements Runnable{

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                SocketChannel socketChannel = serverSocketChannel.accept();
                if (socketChannel != null) {
                    // 建立一個新的處理類
                    new NewHandler(socketChannel, selector);
                }
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
            }
        }
    }



    public static void main(String[] args) {
        NioReactor2 reactor;
        try {
            reactor = new NioReactor2(12345);
            reactor.dispatchLoop();
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

2.2 Handler

class NewHandler implements Runnable {

    private SocketChannel socketChannel;
    private SelectionKey selectionKey;
    private ByteBuffer oldBuffer;
    private static final ExecutorService pool = Executors.newFixedThreadPool(4);

    /** 這裡使用了狀態碼來防止多執行緒出現數據不一致等問題 **/
    static final int PROCESSING = 1;
    static final int PROCESSED = 2;
    private volatile int state = PROCESSED;

    public NewHandler(SocketChannel socketChannel, Selector selector) throws IOException {

        // 初始化的oldBuffer為null
        oldBuffer = null;
        this.socketChannel = socketChannel;
        socketChannel.configureBlocking(false);

        // 在建構函式裡就註冊通道到Selector
        this.selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
        // attach(this)將自身物件繫結到key上,作用是使dispatch()函式正確使用
        selectionKey.attach(this);
        // Selector.wakeup()方法會使阻塞中的Selector.select()方法立刻返回
        selector.wakeup();
    }

    // 使用執行緒池執行
    @Override
    public void run() {
        if (state == PROCESSED) {
            // 如果此時沒有執行緒在處理該通道的本次讀取,就提交申請到執行緒池進行讀寫操作
            pool.execute(new process(selectionKey));
        }else {
            // 如果此時有執行緒正在進行讀寫操作,就直接return,選擇器會進行下一次選擇和任務分派
            return;
        }
    }

    /**
     * 內部類實現對通道資料的讀取處理和傳送
     * 
     * @author CringKong
     *
     */
    private class process implements Runnable {

        private SelectionKey selectionKey;

        public process(SelectionKey selectionKey) {
            this.selectionKey = selectionKey;
            state = PROCESSING;
        }

        /**
         * 這是一個同步方法,因為在reactor中的選擇器有可能會出現一種狀況:
         * 當process執行緒已經要對某通道進行讀寫的時候,有可能Selector會再次選擇該通道
         * 因為此時該process執行緒還並沒有真正的進行讀寫,會導致另一執行緒重新建立一個process
         * 並試圖進行讀寫操作,此時就會出現cpu資源浪費的情況,或者出現異常,因為執行緒1在讀取通道內容的時候
         * 執行緒2就會被阻塞,而等到執行緒2執行操作的時候,執行緒1已經對通道完成了讀寫操做
         * 因此可以通過設定物件狀態碼來防止出現這些問題
         * 
         * @param selectionKey
         * @throws IOException
         * @throws InterruptedException 
         */
        private synchronized void readDate(SelectionKey selectionKey) throws IOException, InterruptedException {

            ByteBuffer newBuffer = ByteBuffer.allocate(64);

            int read;
            while ((read = socketChannel.read(newBuffer)) <= 0) {
                state = PROCESSED;
                return;
            }

            newBuffer.flip();
            String line = readLine(newBuffer);
            if (line != null) {

                // 如果這次讀到了行結束符,就將原來不含有行結束符的buffer合併位一行
                String sendData = readLine(mergeBuffer(oldBuffer, newBuffer));
                if (readLineContent(sendData).equalsIgnoreCase("exit")) { // 如果這一行的內容是exit就斷開連線
                    socketChannel.close();
                    state = PROCESSED;
                    return;
                }
                // 然後直接傳送回到客戶端
                ByteBuffer sendBuffer = ByteBuffer.wrap(sendData.getBytes("utf-8"));
                while (sendBuffer.hasRemaining()) {
                    socketChannel.write(sendBuffer);
                }
                oldBuffer = null;
            } else {
                // 如果這次沒讀到行結束付,就將這次讀的內容和原來的內容合併
                oldBuffer = mergeBuffer(oldBuffer, newBuffer);
            }
            state = PROCESSED;
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                readDate(selectionKey);
            } catch (IOException | InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } 
        }

    }

    /**
     * 讀取ByteBuffer直到一行的末尾 返回這一行的內容,包括換行符
     * 
     * @param buffer
     * @return String 讀取到行末的內容,包括換行符 ; null 如果沒有換行符
     * @throws UnsupportedEncodingException
     */
    private static String readLine(ByteBuffer buffer) throws UnsupportedEncodingException {
        // windows中的換行符表示手段 "\r\n"
        // 基於windows的軟體傳送的換行符是會是CR和LF
        char CR = '\r';
        char LF = '\n';

        boolean crFound = false;
        int index = 0;
        int len = buffer.limit();
        buffer.rewind();
        while (index < len) {
            byte temp = buffer.get();
            if (temp == CR) {
                crFound = true;
            }
            if (crFound && temp == LF) {
                // Arrays.copyOf(srcArr,length)方法會返回一個 源陣列中的長度到length位 的新陣列
                return new String(Arrays.copyOf(buffer.array(), index + 1), "utf-8");
            }
            index++;
        }
        return null;
    }

    /**
     * 獲取一行的內容,不包括換行符
     * 
     * @param buffer
     * @return String 行的內容
     * @throws UnsupportedEncodingException
     */
    private String readLineContent(String line) throws UnsupportedEncodingException {
        System.out.print(line);
        System.out.print(line.length());
        return line.substring(0, line.length() - 2);
    }

    /**
     * 對傳入的Buffer進行拼接
     * 
     * @param oldBuffer
     * @param newBuffer
     * @return ByteBuffer 拼接後的Buffer
     */
    public static ByteBuffer mergeBuffer(ByteBuffer oldBuffer, ByteBuffer newBuffer) {
        // 如果原來的Buffer是null就直接返回
        if (oldBuffer == null) {
            return newBuffer;
        }
        // 如果原來的Buffer的剩餘長度可容納新的buffer則直接拼接
        newBuffer.rewind();
        if (oldBuffer.remaining() > (newBuffer.limit() - newBuffer.position())) {
            return oldBuffer.put(newBuffer);
        }

        // 如果不是以上兩種情況就構建新的Buffer進行拼接
        int oldSize = oldBuffer != null ? oldBuffer.limit() : 0;
        int newSize = newBuffer != null ? newBuffer.limit() : 0;
        ByteBuffer result = ByteBuffer.allocate(oldSize + newSize);

        result.put(Arrays.copyOfRange(oldBuffer.array(), 0, oldSize));
        result.put(Arrays.copyOfRange(newBuffer.array(), 0, newSize));

        return result;
    }

}

可以看到Reactor和模型程式碼的Reactor實現並無二致,而Handler的設計思路是另一種多執行緒模式。

這種多執行緒模式是將讀寫過程完全交給另外的執行緒,而主執行緒只負責分發任務,這樣設計可以說實現了半非同步,因為讀寫過程也不會佔用主執行緒(reactor)的CPU,同時通過選擇器再進行選擇。但它並不是完全意義上的非同步I/O,因為從作業系統的角度上來講,並沒有實現回撥函式和底層的非同步I/O過程。

我們來分析一下程式碼。

  1. NewHandler物件對於一個Scoket連線只建立一次,其中的run()方法並不是為了使用多執行緒。
  2. process內部類是實際的對資料讀寫操作的物件,無論是讀寫資料還是資料操作,全部是在這個類內實現,並且是多執行緒進行操作,也就是說SocketChannel的所有操作都不在主執行緒內完成。
  3. 需要注意的是併發量較小的時候,dispatchLoop會出現執行緒重入的問題,也就是說本身提交到執行緒1中的SocketChannel還未進行讀寫操作,此時dispatchLoop認為該通道依舊處於就緒狀態,而導致執行緒2重新進入了同一個SocketChannel的讀寫操作,就會出現異常,解決方案是使用狀態碼和同步方法進行讀寫操作。

參考資料: