1. 程式人生 > >原始碼分析Dubbo非同步呼叫與事件回撥機制

原始碼分析Dubbo非同步呼叫與事件回撥機制

   本文將詳細分析Dubbo服務非同步呼叫與事件回撥機制。
   1、非同步呼叫與事件回撥機制
   1.1 非同步回撥
這裡寫圖片描述
這裡寫圖片描述
   1.2 事件回撥
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
這裡寫圖片描述
   2、原始碼分析非同步呼叫與事件回撥機制
   在Dubbo中,引入特定的過濾器FutureFilter來處理非同步呼叫相關的邏輯,其定義如下:

@Activate(group = Constants.CONSUMER)
public class FutureFilter implements Filter {
}

   group=CONSUMER說明該過濾器屬於消費端過濾器。
   接下來從從invoke方法詳細分析其實現邏輯。

public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);     // @1
        fireInvokeCallback(invoker, invocation);                                                     // @2
        // need to configure if there's return value before the invocation in order to help invoker to judge if it's
// necessary to return future. Result result = invoker.invoke(invocation); // @3 if (isAsync) { asyncCallback(invoker, invocation); // @4 } else { syncCallback(invoker, invocation, result); // @5
} return result; }

   程式碼@1:首先從URL中獲取是否是非同步呼叫標誌,其配置屬性為< dubbo:service async=”“/>獲取其子標籤< dubbo:method async=”“/>。
   程式碼@2:同步呼叫oninvoke事件,執行invoke方法之前的事件。
   程式碼@3:繼續沿著呼叫鏈呼叫,最終會到具體的協議Invoker,例如DubboInvoker,發生具體的服務呼叫,跟蹤一下同步、非同步呼叫的實現細節。
   程式碼@4:如果呼叫方式是非同步模式,則非同步呼叫onreturn或onthrow事件。
   程式碼@5:如果呼叫方式是同步模式,則同步呼叫onreturn或onthrow事件。

   2.1 原始碼分析FutureFilter#fireInvokeCallback

private void fireInvokeCallback(final Invoker<?> invoker, final Invocation invocation) {
        final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), 
Constants.ON_INVOKE_METHOD_KEY));      // @1
        final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), 
Constants.ON_INVOKE_INSTANCE_KEY));   // @2
        if (onInvokeMethod == null && onInvokeInst == null) {    // @3
            return;
        }
        if (onInvokeMethod == null || onInvokeInst == null) {    // @4
            throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : 
                         "instance") + " found. url:" + invoker.getUrl());
        }
        if (!onInvokeMethod.isAccessible()) {
            onInvokeMethod.setAccessible(true);
        }
        Object[] params = invocation.getArguments();
        try {
            onInvokeMethod.invoke(onInvokeInst, params);        // @5
        } catch (InvocationTargetException e) {
            fireThrowCallback(invoker, invocation, e.getTargetException());    // @6
        } catch (Throwable e) {
            fireThrowCallback(invoker, invocation, e);                         // @7
        }
    }

   程式碼@1:StaticContext.getSystemContext()中根據key:serviceKey + “.” + method + “.” + “oninvoke.method” 獲取配置的oninvoke.method方法名。其中serviceKey為[group]/interface:[version],其中group與version可能為空,忽略。
   程式碼@2:同樣根據key:serviceKey + “.” + method + “.” + “oninvoke.instance” 從StaticContext.getSystemContext()獲取oninvoke.method方法所在的例項名物件,也就是說該呼叫哪個物件的oninvoke.method指定的方法。這裡就有一個疑問,這些資料是在什麼時候存入StaticContext中的呢?下文會詳細分析。
   程式碼@3、@4:主要檢測< dubbo:method oninvoke=”“/>配置的正確性,其正確的配置方式如下:“例項名.方法名”,例如:
這裡寫圖片描述
   程式碼@5:根據發射機制,呼叫oninvoke中指定的例項的指定方法,注意,這裡傳入的引數為呼叫遠端RPC服務的引數。
注意:從這裡可以看出,如果要實現事件通知,也即在呼叫遠端RPC服務之前,之後、丟擲異常時執行回撥函式,該回調事件的方法的引數列表需要與被呼叫服務的引數列表一致。
   程式碼@6、@7,如果在執行呼叫前方法(oninvoke)事件方法失敗,則會同步呼叫onthrow中定義的方法(如有定義)。關於dubbo:method oninvoke屬性的解析以及在什麼時候會向StaticContext.getSystemContext()中新增資訊,將在下文統一介紹。
   2.2 原始碼分析DubboInvoker關於同步非同步呼叫處理
   在上文提到FutureFilter#invoke中的第三步呼叫invoker.invoker方法時,我們應該會有興趣瞭解一下真實的invoker是如何處理同步、非同步請求的。
   我們以dubbo協議DubboInvoker來重點分析一下其實現原理:
   DubboInvoker#doInvoke

try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);            // @1
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);                                             // @2
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
                ResponseFuture future = currentClient.request(inv, timeout);                 // @3
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
                RpcContext.getContext().setFuture(null);     // @4
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }

   程式碼@1:首先獲取async屬性,如果為true表示非同步請求,如果配置了return=”false”表示呼叫模式為oneway,只發呼叫,不關注其呼叫結果。
   程式碼@2:處理oneway的情況。如果設定了sent=true,表示等待網路資料發出才返回,如果sent=false,只是將待發送資料發到IO寫快取區就返回。
   程式碼@3:處理非同步的情況,程式碼@4處理同步呼叫的情況,細看其實都是通過呼叫網路客戶端client的request,最終呼叫HeaderExchangeChannel#request方法:
這裡寫圖片描述
   這裡是通過Future模式來實現非同步呼叫的,同步呼叫也是通過非同步呼叫來實現,只是同步呼叫發起後,直接呼叫future#get的方法來同步等待結果的返回,而非同步呼叫只返回Future Response,在使用者需要關心其結果時才呼叫get方法。

   2.3 原始碼分析asyncCallback與syncCallback
   前面介紹了方法執行之前oninvoker事件的呼叫分析,接下來分析RPC服務呼叫完成後,onreturn和onthrow方法的呼叫邏輯。
   非同步回撥與同步回撥的區別就是呼叫onreturn(fireReturnCallback)和onthrow(fireThrowCallback)呼叫的地方不同,如果是同步呼叫,也就是在完成RPC服務呼叫後,立即呼叫相關的回撥方法,如果是非同步呼叫的話,RPC服務完成後,通過Future模式非同步執行。其實關於onreturn、onthrow屬性的解析,執行與oninvoker屬性的解析完全一樣,再這裡也就不重複介紹了。