1. 程式人生 > >結合源碼分析 bubble 使用註意事項

結合源碼分析 bubble 使用註意事項

parameter pub IE 問題 art url 獲取 lap version

使用dubbo時候要盡量了解源碼,不然會很容易入坑。

一、服務消費端ReferenceConfig需要自行緩存

ReferenceConfig實例是個很重的實例,每個ReferenceConfig實例裏面都維護了與服務註冊中心的一個長鏈,並且維護了與所有服務提供者的的長鏈。假設有一個服務註冊中心和N個服務提供者,那麽每個ReferenceConfig實例裏面維護了N+1個長鏈,如果頻繁的生成ReferenceConfig實例,可能會造成性能問題,甚至產生內存或者連接泄露的風險。特別是使用dubbo api編程時候容易忽略這個問題。

為了解決這個問題,之前都是自行緩存,但是自從dubbo2.4.0版本後,dubbo 提供了簡單的工具類 ReferenceConfigCache 用於緩存ReferenceConfig 實例。使用如下:

/創建服務消費實例
ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
reference.setInterface(XxxService.class);
reference.setVersion("1.0.0");
......
//獲取dubbo提供的緩存
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
// cache.get方法中會緩存 reference對象,並且調用reference.get方法啟動ReferenceConfig,並返回經過代理後的服務接口的對象
XxxService xxxService = cache.get(reference); // 使用xxxService對象 xxxService.sayHello();

需要註意的是 Cache內持有ReferenceConfig對象的引用,不要在外部再調用ReferenceConfig的destroy方法了,這會導致Cache內的ReferenceConfig失效!

如果要銷毀 Cache 中的 ReferenceConfig ,將銷毀 ReferenceConfig 並釋放對應的資源,具體使用下面方法來銷毀

ReferenceConfigCache cache = ReferenceConfigCache.getCache();
cache.destroy(reference);

另外以服務 Group、接口、版本為緩存的 Key,ReferenceConfig實例為對應的value。如果你需要使用自定義的key,可以在創建cache時候調用ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator );方法傳遞自定義的keyGenerator。

二、 並發控制

2.1 服務消費方並發控制 在服務消費方法進行並發控制需要設置actives參數,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000" actives="10"/>

設置com.test.UserServiceBo接口中所有方法,每個方法最多同時並發請求10個請求。

也可以使用下面方法設置接口中的單個方法的並發請求個數,如下:

<dubbo:reference id="userService" interface="com.test.UserServiceBo"
        group="dubbo" version="1.0.0" timeout="3000">
                <dubbo:method name="sayHello" actives="10" />
</dubbo:reference>

如上設置sayHello方法的並發請求數量最大為10,如果客戶端請求該方法並發超過了10則客戶端會被阻塞,等客戶端並發請求數量少於10的時候,該請求才會被發送到服務提供方服務器。在dubbo中客戶端並發控制是使用ActiveLimitFilter過濾器來控制的,代碼如下:

public class ActiveLimitFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        //獲取設置的acvites的值,默認為0
        int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
        //獲取當前方法目前並發請求數量
        RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
        if (max > 0) {//說明設置了actives變量
            long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
            long start = System.currentTimeMillis();
            long remain = timeout;
            int active = count.getActive();
            //如果該方法並發請求數量大於設置值,則掛起當前線程。
            if (active >= max) {
                synchronized (count) {
                    while ((active = count.getActive()) >= max) {
                        try {
                            count.wait(remain);
                        } catch (InterruptedException e) {
                        }
                        //如果等待時間超時,則拋出異常
                        long elapsed = System.currentTimeMillis() - start;
                        remain = timeout - elapsed;
                        if (remain <= 0) {
                            throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                    + invoker.getInterface().getName() + ", method: "
                                    + invocation.getMethodName() + ", elapsed: " + elapsed
                                    + ", timeout: " + timeout + ". concurrent invokes: " + active
                                    + ". max concurrent invoke limit: " + max);
                        }
                    }
                }
            }
        }
        //沒有限流時候,正常調用
        try {
            long begin = System.currentTimeMillis();
            RpcStatus.beginCount(url, methodName);
            try {
                Result result = invoker.invoke(invocation);
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                return result;
            } catch (RuntimeException t) {
                RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                throw t;
            }
        } finally {
            if (max > 0) {
                synchronized (count) {
                    count.notify();
                }
            }
        }
    }

}

可知客戶端並發控制,是如果當並發量達到指定值後,當前客戶端請求線程會被掛起,如果在等待超時期間並發請求量少了,那麽阻塞的線程會被激活,然後發送請求到服務提供方,如果等待超時了,則直接拋出異常,這時候服務根本都沒有發送到服務提供方服務器。

三、 改進的廣播策略

前面我們講解集群容錯時候談到有一個廣播策略,該策略主要用於對所有服務提供者進行廣播消息,那麽有個問題需要思考,廣播是是說你在客戶端調用接口一次,內部就是輪詢調用所有服務提供者的機器的服務,那麽你調用一次該接口,返回值是什麽那?比如內部輪詢了10臺機器,每個機器應該都有一個返回值,那麽你調用的這一次返回值是10個返回值的組成?其實不是,返回的是輪詢調用的最後一個機器結果,那麽如果我們想把所有的機器返回的結果聚合起來如何做的?

結合源碼分析 bubble 使用註意事項