1. 程式人生 > >Dubbo原始碼解析(一)請求排程器 Dispatcher

Dubbo原始碼解析(一)請求排程器 Dispatcher

排程器 Dispatcher

  1. 排程策略

    • all 所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件,心跳等。
    • direct 所有訊息都不派發到執行緒池,全部在 IO 執行緒上直接執行。
    • message 只有請求響應訊息派發到執行緒池,其它連線斷開事件,心跳等訊息,直接在 IO 執行緒上執行。
    • execution 只請求訊息派發到執行緒池,不含響應,響應和其它連線斷開事件,心跳等訊息,直接在 IO 執行緒上執行。
    • connection 在 IO 執行緒上,將連線斷開事件放入佇列,有序逐個執行,其它訊息派發到執行緒池。
  2. 執行緒池 ThreadPool

    • fixed 固定大小執行緒池,啟動時建立執行緒,不關閉,一直持有。(預設)
    • cached 快取執行緒池,空閒一分鐘自動刪除,需要時重建。
    • limited 可伸縮執行緒池,但池中的執行緒數只會增長不會收縮。只增長不收縮的目的是為了避免收縮時突然來了大流量引起的效能問題。
    • eager 優先建立Worker執行緒池。在任務數量大於corePoolSize但是小於maximumPoolSize時,優先建立Worker來處理任務。當任務數量大於maximumPoolSize時,將任務放入阻塞佇列中。阻塞佇列充滿時丟擲RejectedExecutionException。(相比於cached:cached在任務數量超過maximumPoolSize時直接丟擲異常而不是將任務放入阻塞佇列)

從上面可以看到所有的請求、響應、連線、斷開都經過Dispatcher之手。

那我想著具體看下Dispatcher的原始碼吧

@SPI(AllDispatcher.NAME)
public interface Dispatcher {

    /**
     * dispatch the message to threadpool.
     *
     * @param handler
     * @param url
     * @return channel handler
     */
    @Adaptive({Constants.DISPATCHER_KEY, "dispather"
, "channel.handler"}) // The last two parameters are reserved for compatibility with the old configuration ChannelHandler dispatch(ChannelHandler handler, URL url); }

哇哇哇哇
這個SPI是什麼東西啊?

Marker for extension interface 擴充套件介面標識

哦,擴充套件點,暫且認為是Bean這種東西吧
這個Adaptive又是什麼東西?

Decide which target extension to be injected. 決定注入那一個擴充套件點

哦 果然和@AutoWired差不多的意思

我想看有多少個實現,幹了什麼,先看下類圖吧

這裡寫圖片描述

每一種Dispatcher的策略都有一個Dispatcher和一個對應的ChannelHandler

all

回顧一下all的策略

所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件,心跳等。

看一下AllDispatcher的實現

public class AllDispatcher implements Dispatcher {

    public static final String NAME = "all";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new AllChannelHandler(handler, url);
    }

}

So Simple,Why,比如派發策略all應該整個邏輯都在AllChannelHandler這個邏輯裡面


public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request && t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = getExecutorService();
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

發現了,在下面這裡幾類時間的時候,獲取的執行器都是執行緒池,

     /**
     * 連線事件
     */
    public void connected(Channel channel) throws RemotingException {
    }

    /**
     * 斷開事件
     */
    public void disconnected(Channel channel) throws RemotingException {
    }

    /**
     * 發出訊息  內->外
     */
    public void sent(Channel channel, Object message) throws RemotingException {
    }

    /**
     * 接收外部訊息 外->內
     */
    public void received(Channel channel, Object message) throws RemotingException {
    }

    /**
     * 異常
     */
    public void caught(Channel channel, Throwable exception) throws RemotingException {
    }

都是下面的這個

    ExecutorService cexecutor = getExecutorService();

    private ExecutorService getExecutorService() {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }

這個執行緒池又是通過其父類WrappedChannelHandler的構造方法中建立的,


    public WrappedChannelHandler(ChannelHandler handler, URL url) {
        this.handler = handler;
        this.url = url;
        // 獲取執行緒池
        executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

        String componentKey = Constants.EXECUTOR_SERVICE_COMPONENT_KEY;
        if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) {
            componentKey = Constants.CONSUMER_SIDE;
        }
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        dataStore.put(componentKey, Integer.toString(url.getPort()), executor);
    }

可以通過程式碼看到,是通過擴充套件點載入的執行緒池。

@SPI("fixed")
public interface ThreadPool {

    /**
     * Thread pool
     *
     * @param url URL contains thread parameter
     * @return thread pool
     */
    @Adaptive({Constants.THREADPOOL_KEY})
    Executor getExecutor(URL url);

}

型別又是通過上面的ThreadPool擴充套件點定義中threadpool這個配置項配置的,可以像下面一樣進行配置

    <dubbo:protocol name="dubbo" port="20880" dispatcher="all" threadpool="cached" threads="2"/>

根據url中的配置,設定執行緒池的屬性,如下是CachedThreadPool這種型別的建立

  public Executor getExecutor(URL url) {
        // 執行緒名稱
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        // core size
        int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
        // 最大執行緒數
        int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
        // 執行緒佇列數
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        // 存活執行緒
        int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
        return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS,
                queues == 0 ? new SynchronousQueue<Runnable>() :
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>()
                                : new LinkedBlockingQueue<Runnable>(queues)),
                new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

這樣一套訊息進入派發流程就明白了

以上是all策略的派發流程,

direct

我們看看direct這種由IO直接處理的策略

想一想

首先,它不需要建立執行緒池,直接在IO執行緒處理,那麼IO執行緒的表現狀況是什麼?

回想一下,之前在寫最簡單的socket搞聊天的時候,

一開始用一個執行緒用於接收請求、處理並返回,這不就是direct這種策略麼

後面為了提升處理的效能,接受了請求,然後處理操作轉交到執行緒池手中,類似於all這種策略

這是一樣的

直接由IO處理,就是直接接收請求並處理,然後返回,是序列的,不需要執行緒池。

那麼直接返回handler就好了,不需要再進行包裝了

看下原始碼,驗證下是否是正確的

public class DirectDispatcher implements Dispatcher {

    public static final String NAME = "direct";

    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return handler;
    }

}

果然是這樣的,萬變不離其宗啊

message

只有請求響應訊息派發到執行緒池,其它連線斷開事件,心跳等訊息,直接在 IO 執行緒上執行。

那應該就是在recived事件上面使用執行緒池,其它直接返回

public class MessageOnlyChannelHandler extends WrappedChannelHandler {

    public MessageOnlyChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

}

execution

只請求訊息派發到執行緒池,不含響應,響應和其它連線斷開事件,心跳等訊息,直接在 IO 執行緒上執行。

public class ExecutionChannelHandler extends WrappedChannelHandler {

    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    public void connected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
    }

    public void disconnected(Channel channel) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
    }

    public void received(Channel channel, Object message) throws RemotingException {
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //TODO A temporary solution to the problem that the exception information can not be sent to the opposite end after the thread pool is full. Need a refactoring
            //fix The thread pool is full, refuses to call, does not return, and causes the consumer to wait for time out
            if(message instanceof Request &&
                    t instanceof RejectedExecutionException){
                Request request = (Request)message;
                if(request.isTwoWay()){
                    String msg = "Server side("+url.getIp()+","+url.getPort()+") threadpool is exhausted ,detail msg:"+t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
    }

}

==從原始碼上來看 execution 的描述應該是錯了==

應該是除了響應事件,其它都是線上程池執行

官方說是要修復了

connection

在 IO 執行緒上,將連線、斷開事件放入佇列,有序逐個執行,其它訊息派發到執行緒池。

public class ConnectionOrderedChannelHandler extends WrappedChannelHandler {

    protected final ThreadPoolExecutor connectionExecutor;
    private final int queuewarninglimit;

    public ConnectionOrderedChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
        String threadName = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        connectionExecutor = new ThreadPoolExecutor(1, 1,
                0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(url.getPositiveParameter(Constants.CONNECT_QUEUE_CAPACITY, Integer.MAX_VALUE)),
                new NamedThreadFactory(threadName, true),
                new AbortPolicyWithReport(threadName, url)
        );  // FIXME There's no place to release connectionExecutor!
        queuewarninglimit = url.getParameter(Constants.CONNECT_QUEUE_WARNING_SIZE, Constants.DEFAULT_CONNECT_QUEUE_WARNING_SIZE);
    }

    public void connected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    public void disconnected(Channel channel) throws RemotingException {
        try {
            checkQueueLength();
            connectionExecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnected event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
            //fix, reject exception can not be sent to consumer because thread pool is full, resulting in consumers waiting till timeout.
            if (message instanceof Request && t instanceof RejectedExecutionException) {
                Request request = (Request) message;
                if (request.isTwoWay()) {
                    String msg = "Server side(" + url.getIp() + "," + url.getPort() + ") threadpool is exhausted ,detail msg:" + t.getMessage();
                    Response response = new Response(request.getId(), request.getVersion());
                    response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                    response.setErrorMessage(msg);
                    channel.send(response);
                    return;
                }
            }
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown()) {
            cexecutor = SHARED_EXECUTOR;
        }
        try {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private void checkQueueLength() {
        if (connectionExecutor.getQueue().size() > queuewarninglimit) {
            logger.warn(new IllegalThreadStateException("connectionordered channel handler `queue size: " + connectionExecutor.getQueue().size() + " exceed the warning limit number :" + queuewarninglimit));
        }
    }
}