1. 程式人生 > >dubbo原始碼分析-客戶端DubboInvoker呼叫服務端體會Netty的非阻塞IO使用

dubbo原始碼分析-客戶端DubboInvoker呼叫服務端體會Netty的非阻塞IO使用

本文會介紹Dubbo客戶端DubboInvoker呼叫服務端時候非同步同步呼叫,藉此理解Netty的阻塞非阻塞用法

先來看官網的描述:

這裡寫圖片描述

這裡寫圖片描述

上面的描述對應實現在DubboInvoker類。

DubboInvoker doInvoke(final Invocation invocation)方法:

....
    try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int
timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null
); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout) ; RpcContext.getContext().setFuture(new FutureAdapter<Object>(future)); return new RpcResult(); } else
{ RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } ....

關鍵引數說明:

1)isOneway: oneway呼叫,只發送請求,不接收返回結果 //RpcContext中如此描述。
2)isAsync: 非同步呼叫
3)else: 本地呼叫(同步呼叫),執行緒等待返回結果

isOneway的用法 (只發送,不返回結果,傳送時候阻塞保證傳送成功)

對於1) currentClient.send(inv, isSent) 中的isSent引數作用:-
設定是否等待訊息發出:(非同步總是不等待返回結果)
● sent=”true” 等待訊息發出,訊息傳送失敗將丟擲異常。
● sent=”false” 不等待訊息發出,將訊息放入IO佇列,即刻返回。

注意:
首先,是否等待訊息發出與是否接收返回結果是兩回事。 例如, 該isOneway的做法是不接收返回結果,但是如果isSent=true則等待訊息發出;
其次,對與oneway呼叫,NettyChannel 中send方法裡的 sent = true
NettyChannel send(Object message, boolean sent)方法:

....
  try {
            ChannelFuture future = channel.write(message);//傳送時候不阻塞
            if (sent) {
                timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                success = future.await(timeout); // 執行緒執行此會等待訊息發出,同步
            }
....

(HeaderExchangeChannel send –》到 NettyChannel send)

isAsync的用法 (呼叫傳送不阻塞,傳送時候不阻塞,獲取返回結果時候阻塞, 獲取返回結果的位置靈活,結果放在RpcContext中)

對於2)返回結果包裝在Future中,並且Future放入到RpcContext中,RpcContext是執行緒安全,因為RpcContext裡用ThreadLocal儲存該RpcContext的具體內容

    ResponseFuture future = currentClient.request(inv, timeout) ; //呼叫傳送不阻塞
    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));  

以上兩個方法都是非阻塞的,也就是程式沒有任何等待直接執行。注意,對與Async呼叫,NettyChannel 中send方法裡的 sent=false

什麼時候取回該結果? 按官網描述可知,用RpcContext.getFuture.get()則可以獲取。但是當使用這個方法呼叫位置是阻塞的,因為要等待結果。

(HeaderExchangeChannel request–》到 NettyChannel send)

本地呼叫, (呼叫傳送不阻塞,傳送時候不阻塞,獲取返回結果時候阻塞,獲取返回結果的位置不靈活)

對於3)本地呼叫(同步呼叫),執行緒等待返回結果
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();

注意,get()方法會阻塞,執行緒執行到這裡等待返回傳送結果。
注意,
首先,對於本地呼叫(同步呼叫),NettyChannel 中send方法裡的 sent=false。
其次,重點理解!! currentClient.request(inv, timeout)裡執行的是Netty裡的非阻塞呼叫ChannelFuture future = channel.write(message); 而程式並沒有在netty的呼叫層次上阻塞(估計目的是讓netty高併發處理),但是get()會在主程式執行位置阻塞直到返回結果,該實現是dubbo的DefaultFuture自己實現,與Netty不相關。可以看出,為了等待返回結果,該阻塞是在業務執行緒裡控制,並不涉及Netty的阻塞用法。