1. 程式人生 > >dubbo分執行緒池處理同一服務請求隨記

dubbo分執行緒池處理同一服務請求隨記

  • 前言
    最近在看release it,第二章中的案例提到底層服務被資料庫阻塞後把服務執行緒池全部佔滿並導致上層應用一直阻塞,結合自己部門的線上服務考慮,比如一個服務下會http請求外部應用,根據不同引數會處理時間長短會不同,這樣考慮從dubbo中間層做一個保護,比如把處理時間會很長的這種引數的請求歸類到一個執行緒池下,其他的歸類到另外一個執行緒池下,這樣即使處理時間長的請求一直阻塞起碼不影響到其他的功能。
    當然還有其他解決辦法:

    • 增加執行緒池大小或者加機器,但是無法預知什麼時間點會增加到什麼量,即使增加執行緒池大小也無法根治
    • 根據處理時長分服務處理,目前dubbo對於同一應用服務都是使用一個執行緒池處理接收請求,分兩個應用代價太大
    • 減小超時時長,一定程度上可以解決,很有可能大部分正常請求也無法正常響應,比如99%執行時長在7s而設定超時時長為2s顯然不太合適
    • 變同步為非同步,這當然是最能根治的辦法,但是從使用者互動及後臺開發流程都得改變,代價略大
  • dubbo網路層結構

    dubbo執行緒模型
    dubbo接收請求基本流程
    netty處理接收請求->NioServerSocketPipelineSink$Boss->註冊NioWorker執行緒用於處理長連線和IO操作->DefaultChannelPipeline(處理請求的反序列化和分發handler,初始化過程見NettyServer,會往pipeline加入decoder、encoder和NettyHandler)->decoder做反序列化->NettyHandler處理請求->NettyServer處理請求(NettyServer初始化封裝了MultiMessageHandler、HeartbeatHandler以及根據dispatcher得到的channelHandler,預設是AllDispatcher)->MultiMessageHandler for each處理批量請求->HeartbeatHandler處理心跳請求->AllDispatcher得到的AllChannelHandler處理將通道所有狀態變更用新執行緒處理(包括建立連線、斷開連線、接收請求、異常)->從指定的執行緒池拿到執行緒執行rpc請求->HeaderExchangeHandler判斷雙向和單向處理請求,雙向則將結果使用當前channel回寫->到DubboProtocol$

    ExchangeHandlerAdapter處理請求呼叫->到此服務端網路傳輸層結束

  • 如何自定義一個Dispatcher
    參見dubbo原始碼包/META-INF/dubbo/internal/com.alibaba.dubbo.remoting.Dispather,並在protocol配置時指定為自定義的dispatcher。
    順帶一提,現已有的幾個Dispatcher所實現的功能:

    • AllDispather(注:Dispather不是我打錯了。。類名就是這個)
      轉發給AllChannelHandler,預設200大小的fixed執行緒池,可通過threadpool配置為其他型別執行緒池,由IOThread轉交,處理建立連線、關閉連線、處理接收請求、處理異常四種情況都會使用該執行緒池中的執行緒。
    • ConnectionOrderedDispather
      轉發給ConnectionOrderedChannelHandler,建立連線和關閉連線動作採用同一個單執行緒的執行緒池處理,線上程池佇列超過一定數量提供warn日誌,接收請求和異常處理使用配置的執行緒池。
    • DirectDispather
      直接使用IOThread處理請求。
    • ExecutionDispather
      功能與AllDispatcher類似,更像是AllDispatcher的一個老版本?少了異常處理和獲取executor不存在的處理。
    • MessageOnlyDispather
      只有接收請求才使用配置的執行緒池處理,其他使用IOThread處理。

    還沒理順的一個點是caught、send、receive、connection、disconnection分別是什麼點進入Dispatcher。

  • 如何通過自定義Dispatcher完成分執行緒池處理同一服務

    新建自定義Dispatcher和對應的handler,可以根據需要繼承AllDispatcher等,重寫handleReceive,如有必要可再自定義ThreadPool。
    先貼一段Dispatcher和Handler可實現方案,邏輯優化可以再考慮,比如框架不應該使用業務DTO這種,自己的執行緒池(保證全域性唯一,這裡TimeDecideChannelHandler是初始化構造的,所以成員屬性也是全域性唯一的)處理方案等。
    增加配置檔案:配置自定義dispather
    timeDecide=com.web.foo.dispatcher.TimeDecideDispatcher
    配置dispather:<dubbo:protocol name="dubbo" port="-1" accesslog="d:/access.log" dispather="timeDecide"></dubbo:protocol>

package com.web.foo.dispatcher;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.Dispather;
import com.web.foo.handler.TimeDecideChannelHandler;


public class TimeDecideDispatcher implements Dispather
{

     public static final String NAME = "timeDecide";

    @Override
    public ChannelHandler dispath(ChannelHandler handler, URL url)
    {
        return new TimeDecideChannelHandler(handler, url);
    }

}
package com.web.foo.handler;

import java.util.concurrent.ExecutorService;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool;
import com.alibaba.dubbo.remoting.Channel;
import com.alibaba.dubbo.remoting.ChannelHandler;
import com.alibaba.dubbo.remoting.ExecutionException;
import com.alibaba.dubbo.remoting.RemotingException;
import com.alibaba.dubbo.remoting.exchange.Request;
import com.alibaba.dubbo.remoting.transport.dispather.ChannelEventRunnable;
import com.alibaba.dubbo.remoting.transport.dispather.ChannelEventRunnable.ChannelState;
import com.alibaba.dubbo.remoting.transport.dispather.WrappedChannelHandler;
import com.alibaba.dubbo.rpc.Invocation;
import com.web.foo.dto.TestDto;

/**
 * @Project
 * @Description:
 * @Create 2015年7月19日下午6:25:17
 * @author zhoushaoyu
 * 
 */

public class TimeDecideChannelHandler extends WrappedChannelHandler
{
    FixedThreadPool tp = new FixedThreadPool();
    ExecutorService exe;
    /**
     * @param handler
     * @param url
     */
    public TimeDecideChannelHandler(ChannelHandler handler, URL url)
    {
        super(handler, url);
        url = url.addParameter(Constants.THREAD_NAME_KEY, "my-fixed-thread-pool");
        exe = (ExecutorService) tp.getExecutor(url);
    }

    public void connected(Channel channel) throws RemotingException
    {
        ExecutorService cexecutor = getExecutorService();
        try
        {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t)
        {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    public void disconnected(Channel channel) throws RemotingException
    {
        ExecutorService cexecutor = getExecutorService();
        try
        {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t)
        {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .",
                    t);
        }
    }

    public void received(Channel channel, Object message) throws RemotingException
    {
        ExecutorService cexecutor = getExecutorService(message);
        try
        {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t)
        {
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    public void caught(Channel channel, Throwable exception) throws RemotingException
    {
        ExecutorService cexecutor = getExecutorService();
        try
        {
            cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t)
        {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }

    private ExecutorService getExecutorService(Object message)
    {
        Object inv = message;
        if (message instanceof Request)
        {
            inv = ((Request) message).getData();
        }
        if (!(inv instanceof Invocation))
        {
            return getExecutorService();
        }
        for (Object p : ((Invocation) inv).getArguments())
        {
            if (p instanceof String && "longCall".equalsIgnoreCase((String) p))
            {

                return (ExecutorService) exe;
            }
            if (p instanceof TestDto && "longCall".equalsIgnoreCase(((TestDto) p).getTest()))
            {
                return (ExecutorService) exe;
            }
        }
        return getExecutorService();
    }

    private ExecutorService getExecutorService()
    {
        ExecutorService cexecutor = executor;
        if (cexecutor == null || cexecutor.isShutdown())
        {
            cexecutor = SHARED_EXECUTOR;
        }
        return cexecutor;
    }
}

可以看到最終的效果,如果是longCall進來就是在my-fixed-thread-pool,如果是其他的就在DubboServerHandler
最終效果

相關推薦

dubbo執行處理同一服務請求

前言 最近在看release it,第二章中的案例提到底層服務被資料庫阻塞後把服務執行緒池全部佔滿並導致上層應用一直阻塞,結合自己部門的線上服務考慮,比如一個服務下會http請求外部應用,根據不同引數會處理時間長短會不同,這樣考慮從dubbo中間層做一個保護

執行處理高併發請求

背景 本系統(支付系統)會在每個月特定時間(如賬單日某個時間)接收上游系統發起的大量請求並進行處理,並在處理完成後返回結果給上游系統。而本系統接收到請求進行處理的過程是呼叫第三方(支付公司)進行處理並獲取結果。 系統原實現方案沒有采用任何控制請求併發數的措施,接

Tomcat如何使用執行處理遠端併發請求

# Tomcat如何使用執行緒池處理遠端併發請求 通過了解學習tomcat如何處理併發請求,瞭解到執行緒池,鎖,佇列,unsafe類,下面的主要程式碼來自 java-jre: `sun.misc.Unsafe` `java.util.concurrent.ThreadPoolExecutor` `java

Dubbo裡面執行的拒絕策略

Dubbo裡面執行緒池的拒絕策略 public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { protected static final Logger logger = LoggerFactory.get

socket+執行,寫服務端和客戶端進行互動

以下內容轉自: https://www.cnblogs.com/gnoc/p/4866788.html 前言   socket(套接字),Socket和ServerSocket位於java.net包中,持續開啟服務端,接收來自客戶端的資訊,並響應。 最開始,咱們先來兩段最簡單的服

批量匯入大資料表的時候 使用執行處理

public class UpdateMebinfoThread implements Runnable{private int beginnum= 0;// 開始行數private int endnum= 0;// 讀取記錄數 public UpdateMebinfoTh

springboot之多工並行+執行處理

最近專案中做到一個關於批量發簡訊的業務,如果使用者量特別大的話,不能使用單執行緒去發簡訊,只能嘗試著使用多工來完成!我們的專案使用到了方式二,即Future的方案 Java 執行緒池 Java通過Executors提供四種執行緒池,分別為

使用BlockingQueue,多個執行同時處理同一型別的多個資源

如果是單執行緒處理一批事情,例如,有16個日誌需要處理,各個日誌之間是獨立的,假設處理每個的時間是1秒, 一共需要處理16秒才能處理完。 現在使用多執行緒來加速處理時間,思路: 建立4個執行緒,每個執行緒從一個任務列表中獲取一個任務,進行處理,處理完後,再獲取一個,直到任務

執行處理異常的策略

執行緒池的預設異常處理策略:       ThreadPoolExecutor.AbortPolicy,處理程式發生異常,丟擲異常RejectedExecutionExceptionThreadPoolExecutor.CallerRunsPolicy ,執行緒呼叫該任務的e

springcloud非同步執行、高併發請求feign解決方案

ScenTaskTestApplication.java package com.test; import org.springframework.boot.SpringApplication; import org.springframework.boot.a

Android--menu和OkHttp框架(未封裝),結合Executors(執行)實現網路請求的案例

涉及到的 知識點: 1.安卓UI元件menu 2.OkHttp框架 3.Executors(執行緒池) OkHttp結構簡介 案例程式碼 import android.os.Bundle; import android.suppo

java socket 服務端併發處理執行的使用

package yiwangzhibujian.threadserver; import java.io.InputStream; import java.net.ServerSocket; import java.net.Socket; import java.util.

springcloud系列—Hystrix—第3章-3: Hystrix 服務降級(fallback)與異常處理,Hystrix依賴隔離(命令名稱-分組和執行)、請求快取與清除快取、斷路器

資料參考:《Spring Cloud 微服務實戰》 目錄 服務降級 在HystrixCommand中可以通過過載getFallback()方法來實現服務降級邏輯。 在 HystrixObservableCommand 實現得 Hystrix 命令中,我們可以通過過載 resumenW

使用Java佇列來處理日誌資訊(執行的使用)

阿里的規範是使用new ThreadPoolExecutor()來建立執行緒池,二不是使用Excutor的靜態工具類來建立執行緒池,具體可以檢視部落格(兩篇):   https://blog.csdn.net/angus_Lucky/article/details/798

總結下java執行處理策略

執行緒池 任務提交給執行緒池之後的處理策略: 如果當前執行緒池中的執行緒數目<corePoolSize,則每來一個任務,就會建立一個執行緒去執行這個任務; 如果當前執行緒池中的執行緒數目>=corePoolSize,則每來一個任務,會嘗試將其新增到任務快取隊列當中,若

一個取消多生產者單消費者的日誌執行服務

package concurrent._ThreadPool.logService; import net.jcip.annotations.GuardedBy; import org.omg.PortableInterceptor.SYSTEM_EXCEPTION; import java.io.

Hystrix 服務的隔離策略對比,訊號量與執行隔離的差異

支援的隔離策略 Hystrix支援的 hytrix支援執行緒池隔離和訊號量隔離 訊號量的隔離:  it executes on the calling thread and concurrent requests are limited by the semaphore count  - 引自

Dubbo學習筆記8:Dubbo執行模型與執行策略

Dubbo預設的底層網路通訊使用的是Netty,服務提供方NettyServer使用兩級執行緒池,其中 EventLoopGroup(boss) 主要用來接受客戶端的連結請求,並把接受的請求分發給 EventLoopGroup(worker) 來處理,boss和worker執

深度解析Java執行的異常處理機制

前言 今天小夥伴遇到個小問題,執行緒池提交的任務如果沒有catch異常,那麼會拋到哪裡去,之前倒是沒研究過,本著實事求是的原則,看了一下程式碼。 正文 小問題 考慮下面這段程式碼,有什麼區別呢?你可以猜猜會不會有異常打出呢?如果打出來的話是在哪裡?: ExecutorSe

使用spring非同步請求處理以及執行所帶來的坑以及利用visualvm監測執行及效能【草稿】

前言 在開始前,請先看看: 因為涉及到執行緒,估計肉眼是沒辦法觀測的了只能用工具了。 預設執行緒池的坑 非同步web開發專題及tomcat下的spring非同步請求配置勘誤 當你按照上文高高興興地配置好非同步支援然後想要開啟高併發的人生,我只能說你圖樣圖森破