1. 程式人生 > >dubbo原始碼分析22 -- consumer 傳送與接收原理

dubbo原始碼分析22 -- consumer 傳送與接收原理

在前面的文章中,我們分析了 dubbo 從 provider 進行服務暴露,然後把服務資訊註冊到註冊中心上面解耦 consumer 與 provider 的呼叫。consumer 通過 javassist 建立代理物件引用遠端服務。當通過代理物件呼叫遠端服務的時候,講到進行真正呼叫的時候 dubbo 抽象出叢集容錯(ClusterDirectoryRouterLoadBalance)從服務多個暴露方選取出一個合適的 Invoke 來進行呼叫。 dubbo 預設是通過 FailoverClusterInvoker 從多個 Invoke 中選擇出一個 Invoke 例項 InvokerWrapper

 來進行遠端呼叫。本次分析主要包括以下 4 個部分:

  • consumer 傳送擴充套件
  • consumer 傳送原理
  • consumer 接收原理
  • dubbo 非同步變同步

1、consumer 傳送擴充套件

我們先來看一下 dubbo 中 consumer 端的請求傳送原理,也就是從 InvokerWrapper#invoke 開始,在 consumer 服務引用分析的時候,我們知道根據 Invoke 呼叫的時候, dubbo 會建立 ProtocolListenerWrapper與 ProtocolFilterWrapper 來用整合框架使用者的擴充套件包含:InvokerListener 與 Filter

ProtocolListenerWrapper 在物件建立的時候就會呼叫InvokerListener#referred擴充套件,所以在遠端服務呼叫的時候最主要的還是 Filter 擴充套件,下面我們就看一下在遠端呼叫的時候預設包括哪些 Filter 擴充套件:

  • ConsumerContextFilter
  • FutureFilter
  • MonitorFilter

1.1 ConsumerContextFilter

ConsumerContextFilter 儲存客戶端資訊到 RpcContext

@Activate(group = Constants.CONSUMER, order = -10000)
public class ConsumerContextFilter implements Filter {

    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        RpcContext.getContext()
                .setInvoker(invoker)
                .setInvocation(invocation)
                .setLocalAddress(NetUtils.getLocalHost(), 0)
                .setRemoteAddress(invoker.getUrl().getHost(),
                        invoker.getUrl().getPort());
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(invoker);
        }
        try {
            return invoker.invoke(invocation);
        } finally {
            RpcContext.getContext().clearAttachments();
        }
    }

}

RpcContext 使用 ThreadLocal 來記錄一個臨時狀態。當接收到 RPC 請求,或發起 RPC請求時,RpcContext 的狀態都會變化。

> 比如:A 調 B,B 再調 C,則 B 機器上,在 B 調 C 之前,RpcContext 記錄的是 A 調 B 的資訊,在 B 調 C 之後,RpcContext 記錄的是 B 調 C 的資訊。

可以通過 RpcContext 上的 setAttachment 和 getAttachment 在服務消費方和提供方之間進行引數的隱式傳遞。

1.2 FutureFilter

FutureFilter 會來處理 dubbo 服務介面呼叫方配置 async="true" 來使用同步呼叫來是非同步呼叫。

public class FutureFilter implements Filter {

    protected static final Logger logger = LoggerFactory.getLogger(FutureFilter.class);

    public Result invoke(final Invoker<?> invoker, final Invocation invocation) throws RpcException {
        final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation);

        fireInvokeCallback(invoker, invocation);
        //需要在呼叫前配置好是否有返回值,已供invoker判斷是否需要返回future.
        Result result = invoker.invoke(invocation);
        if (isAsync) {
            asyncCallback(invoker, invocation);
        } else {
            syncCallback(invoker, invocation, result);
        }
        return result;
    }
}

同步呼叫 dubbo 就會同步的返回 provider 方法呼叫返回的響應.如果是非同步呼叫在進行呼叫的時候就會把請求資訊傳送到 provider 然後返回一個空的 RpcResultconsumer 端如果要獲取響應需要通過以下方法獲取:

// 拿到呼叫的Future引用,當結果返回後,會被通知和設定到此Future
Future<Bar> barFuture = RpcContext.getContext().getFuture(); 
// 同理等待bar返回
Bar bar = barFuture.get(); 

1.3 MonitorFilter

MonitorFilter 其實是在分析之前 dubbo monitor 的時候就進行了詳細的分析。它主要是通過以下配置來啟用 provider 與 consumer 端的指標監控。

<dubbo:monitor protocol="registry" />

我們還是簡單的來看一下它的原始碼:

public class MonitorFilter implements Filter {

    private MonitorFactory monitorFactory;

    public void setMonitorFactory(MonitorFactory monitorFactory) {
        this.monitorFactory = monitorFactory;
    }

    // 呼叫過程攔截
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        if (invoker.getUrl().hasParameter(Constants.MONITOR_KEY)) {
            RpcContext context = RpcContext.getContext(); // 提供方必須在invoke()之前獲取context資訊
            String remoteHost = context.getRemoteHost();
            long start = System.currentTimeMillis(); // 記錄起始時間戮
            getConcurrent(invoker, invocation).incrementAndGet(); // 併發計數
            try {
                Result result = invoker.invoke(invocation); // 讓呼叫鏈往下執行
                collect(invoker, invocation, result, remoteHost, start, false);
                return result;
            } catch (RpcException e) {
                collect(invoker, invocation, null, remoteHost, start, true);
                throw e;
            } finally {
                getConcurrent(invoker, invocation).decrementAndGet(); // 併發計數
            }
        } else {
            return invoker.invoke(invocation);
        }
    }
}

當啟動 dubbo monitor 的時候會暴露一個遠端服務 MonitorService 介面服務服務,具體的處理類是 SimpleMonitorService。而在 MonitorFilter#collect 方法裡面 MonitorFactory 會建立一個 Monitor 介面例項(繼承於 MonitorService)。其實就是 DubboMonitorFactroy#createMonitor 遠端引用 dubbo monitor 暴露的 MonitorService 服務。

public class DubboMonitorFactroy extends AbstractMonitorFactory {

    private Protocol protocol;

    private ProxyFactory proxyFactory;

    public void setProtocol(Protocol protocol) {
        this.protocol = protocol;
    }

    public void setProxyFactory(ProxyFactory proxyFactory) {
        this.proxyFactory = proxyFactory;
    }

    @Override
    protected Monitor createMonitor(URL url) {
        url = url.setProtocol(url.getParameter(Constants.PROTOCOL_KEY, "dubbo"));
        if (url.getPath() == null || url.getPath().length() == 0) {
            url = url.setPath(MonitorService.class.getName());
        }
        String filter = url.getParameter(Constants.REFERENCE_FILTER_KEY);
        if (filter == null || filter.length() == 0) {
            filter = "";
        } else {
            filter = filter + ",";
        }
        url = url.addParameters(Constants.CLUSTER_KEY, "failsafe", Constants.CHECK_KEY, String.valueOf(false),
                Constants.REFERENCE_FILTER_KEY, filter + "-monitor");
        Invoker<MonitorService> monitorInvoker = protocol.refer(MonitorService.class, url);
        MonitorService monitorService = proxyFactory.getProxy(monitorInvoker);
        return new DubboMonitor(monitorInvoker, monitorService);
    }

}

獲取到遠端服務 SimpleMonitorService,最後在 MonitorFilter#collect 呼叫 MonitorService#collect 進行監控資料採集提供給 dubbo monitor。呼叫過程如下所示:

consumer 傳送擴充套件.jpg

2、consumer 傳送原理

最終 consumer 會到 DubboInvoke 進行服務呼叫。它會在 AbstractInvoker#invoke 新增一些擴充套件引數到 RpcInvocation這個遠端呼叫物件裡面。新增的擴充套件引數包含:

  • interface : 遠端呼叫的介面名稱
  • group : 介面分組名稱
  • token : 呼叫的 token 資訊
  • timeout : 呼叫服務的超時時間
  • async : 是否非同步呼叫
  • id : 非同步操作預設新增 invocation id,用於保證操作冪等

以及 RpcContext 傳遞過來的擴充套件引數(RpcContext#attachments)。然後在 DubboInvoker#doInvoke 中會新增 path (介面全類名) 以及 version(版本資訊)。再根據 dubbo 的呼叫模式進行遠端呼叫,包含以下三種呼叫模式:

  • oneway 模式:<dubbo:method>標籤的 return 屬性配置為false,則是oneway模式,利用ExchangeClient 物件向服務端傳送請求訊息之後,立即返回空 RpcResult 物件
  • 非同步模式:<dubbo:method>標籤的 async 屬性配置為 ture,則是非同步模式,直接返回空 RpcResult物件,由 FutureFilter 和 DefaultFuture 完成非同步處理工作
  • 同步模式:預設即是同步,則傳送請求之後執行緒進入等待狀態,直到收到服務端的響應訊息或者超時。

下面我們看一下 dubbo 同步呼叫時序圖:

DubboInvoke.png

ChannelFuture future = channel.write(message);

最終是呼叫 org.jboss.netty.channel.Channel 通過 socket 傳送訊息到從叢集中選擇出的一個暴露服務資訊的伺服器傳送網路資料。

3、consumer 接收原理

我們都知道 dubbo 其實是通過 netty 來進行 socket 通訊的。而在使用 netty 進行網路程式設計的時候,其實核心就是就是實現 ChannelHandler。而在 dubbo 中對應的實現類就是 NettyHandler(高版本支援支援 netty 4 使用的是 NettyClientHandler ,NettyHandler 使用的是 netty 3.x)。如果在 consumer 端(provider 也支援)需要使用 netty 4 進行業務處理,需要進行進行以下配置:

<dubbo:consumer client="netty4" />

所以 consumer 接收 provider 響應的入口就在 NettyClientHandler#channelRead

NettyClientHandler.jpg

首先 ChannelHandler 用於接收 provider 端響應回來的請求,然後經過 5 個 dubbo 自定義的 ChannelHandler

  • MultiMessageHandler:支援 MultiMessage 訊息處理,也就是多條訊息處理。
  • HeartbeatHandler:netty 心條檢測。如果心跳請求,傳送心跳然後直接 return,如果是心跳響應直接 return
  • AllChannelHandler:使用執行緒池通過 ChannelEventRunnable 工作類來處理網路事件。
  • DecodeHandler:解碼 message,解析成 dubbo 中的 Response 物件
  • HeaderExchangeHandler:處理解析後的 provider 端返回的 Response 響應資訊,把響應結果賦值到 DefaultFuture 響應獲取阻塞物件中。

4、dubbo 非同步變同步

我們都知道 dubbo 是基於 netty NIO 的非阻塞 並行呼叫通訊。所以 dubbo 在 consumer 請求 provider 後響應都是非同步的。但是在 dubbo 裡面預設是同步返回的,那麼 dubbo 是如何把非同步響應變成同步請求的呢?帶著這個問題,首先我們來看一下 dubbo 裡面的幾種請求方式。

4.1 非同步且無返回值

這種請求最簡單,consumer 把請求資訊傳送給 provider 就行了。只是需要在 consumer 端把請求方式配置成非同步請求就好了。如下:

<dubbo:method name="sayHello" return="false"></dubbo:method>

4.2 非同步且有返回值

這種情況下 consumer 首先把請求資訊傳送給 provider 。這個時候在 consumer 端不僅把請求方式配置成非同步,並且需要 RpcContext 這個 ThreadLocal 物件獲取到 Future 物件,然後通過 Future#get() 阻塞式獲取到 provider 的響應。那麼這個 Future 是如果新增到 RpcContext 中呢?

在第二小節講服務傳送的時候, 在 DubboInvoke 裡面有三種呼叫方式,之前只具體請求了同步請求的傳送方式而且沒有非同步請求的傳送。非同步請求傳送程式碼如下:

> DubboInvoker#doInvoke 中的 else if (isAsync) 分支

    ResponseFuture future = currentClient.request(inv, timeout);
    FutureAdapter<T> futureAdapter = new FutureAdapter<>(future);
    RpcContext.getContext().setFuture(futureAdapter);
    Result result;
    if (RpcUtils.isAsyncFuture(getUrl(), inv)) {
        result = new AsyncRpcResult<>(futureAdapter);
    } else {
        result = new RpcResult();
    }
    return result;

上面的程式碼邏輯是直接傳送請求到 provider 返回一個 ResponseFuture 例項,然後把這個 Future 物件儲存到 RpcContext#LOCAL 這個 ThreadLocal 當前執行緒物件當中,並且返回一個空的 RpcResult物件。如果要獲取到 provider響應的資訊,需要進行以下操作:

// 拿到呼叫的Future引用,當結果返回後,會被通知和設定到此Future
Future<String> temp= RpcContext.getContext().getFuture();
// 同理等待bar返回
hello=temp.get();

4.3 非同步變同步(預設)

下面我們就來討論一下 dubbo 是如何把非同步請求轉化成同步請求的。其實原理和非同步請求的通過 Future#get 等待 provider 響應返回一樣,只不過非同步有返回值是顯示呼叫而預設是 dubbo 內部把這步完成了。下面我們就來分析一下 dubbo 是如何把 netty 的非同步響應變成同步返回的。(當前執行緒怎麼讓它 "暫停",等結果回來後,再執行?)

我們都知道在 consumer 傳送請求的時候會呼叫 HeaderExchangeChannel#request 方法:

> HeaderExchangeChannel#request

    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

它首先會通過 dubbo 自定義的 ChannelRequest 與 timeout(int) 構造一個 DefaultFuture 物件。然後再通過 NettyChannel 傳送請求到 provider,最後返回這個 DefaultFuture。下面我們來看一下通過構造方法是如何建立 DefaultFuture 的。我只把主要涉及到的屬性展示出來:

public class DefaultFuture implements ResponseFuture {

    private static final Map<Long, Channel> CHANNELS = new ConcurrentHashMap<Long, Channel>();

    private static final Map<Long, DefaultFuture> FUTURES = new ConcurrentHashMap<Long, DefaultFuture>();

    private final long id;
    private final Channel channel;
    private final Request request;
    private final int timeout;

    public DefaultFuture(Channel channel, Request request, int timeout) {
        this.channel = channel;
        this.request = request;
        this.id = request.getId();
        this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
        // put into waiting map.
        FUTURES.put(id, this);
        CHANNELS.put(id, channel);
    }
}

這個 id 是在建立 Request 的時候使用 AtomicLong#getAndIncrement 生成的。從 1 開始並且如果它一直增加直到生成負數也能保證這臺機器這個值是唯一的,且不衝突的。符合唯一主鍵原則。 dubbo 默認同步變非同步其實和非同步呼叫一樣,也是在 DubboInvoker#doInvoke 實現的。

> DubboInvoker#doInvoke

    RpcContext.getContext().setFuture(null);
    return (Result) currentClient.request(inv, timeout).get();

關鍵就在 ResponseFuture#get 方法上面,下面我們來看一下這個方法的原始碼:

    public Object get(int timeout) throws RemotingException {
        if (timeout <= 0) {
            timeout = Constants.DEFAULT_TIMEOUT;
        }
        if (!isDone()) {
            long start = System.currentTimeMillis();
            lock.lock();
            try {
                while (!isDone()) {
                    done.await(timeout, TimeUnit.MILLISECONDS);
                    if (isDone() || System.currentTimeMillis() - start > timeout) {
                        break;
                    }
                }
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } finally {
                lock.unlock();
            }
            if (!isDone()) {
                throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
            }
        }
        return returnFromResponse();
    }

其實就是 while 迴圈,利用 java 的 lock 機制判斷如果在超時時間範圍內 DefaultFuture#response 如果賦值成不為空就返回響應,否則丟擲 TimeoutException 異常。下面我們就來看一下 DefaultFuture#response 是如何被賦值的。

還記得 consumer 接收 provider 響應的最後一步嗎?就是 

相關推薦

dubbo原始碼分析22 -- consumer 傳送接收原理

在前面的文章中,我們分析了 dubbo 從 provider 進行服務暴露,然後把服務資訊註冊到註冊中心上面解耦 consumer 與 provider 的呼叫。consumer 通過 javassist 建立代理物件引用遠端服務。當通過代理物件呼叫遠端服務的時候,講到進行真

分散式訊息佇列 RocketMQ 原始碼分析 —— Message 順序傳送消費

本文主要基於 RocketMQ 4.0.x 正式版 1. 概述 建議前置閱讀內容: 當然對 Message 傳送與消費已經有一定了解的同學,可以選擇跳過。 RocketMQ 提供了兩種順序級別: 普通順序訊息 :Producer 將相關聯的訊息傳送到相同

dubbo原始碼分析(二):超時原理以及應用場景

本篇主要記錄dubbo中關於超時的常見問題,實現原理,解決的問題以及如何在服務降級中體現作用等。 超時問題 為了檢查對dubbo超時的理解,嘗試回答如下幾個問題,如果回答不上來或者不確定那麼說明此處需要再多研究研究。 我只是針對個人的理解提問題,並不代表我理解的就是全面深入的,但我的問題如果也回答不

郵件原理JavaMail開發(一)——郵件的傳送接收原理

Java郵件開發介紹 為什麼要學習javamail開發 現在很多WEB應用在開發時都需要整合郵件傳送功能,例如: 給新註冊的使用者自動傳送一封包含其註冊資訊的歡迎E-Mail。 給過生日的註冊會員自動傳送一封表示祝賀的E-Mail。 將網站的最新活動

dubbo源碼福彩快三平臺搭建分析22 -- consumer 發送接收原理

pre else 創建 機器 def 都是 etc group get() 在前面福彩快三平臺搭建論壇:haozbbs.com Q1446595067的文章中,我們分析了 dubbo 從 provider 進行服務暴露,然後把服務信息註冊到註冊中心上面解耦 consumer

dubbo原始碼分析23 -- provider 接收傳送原理

在前面一篇部落格中分享了 dubbo 在網路通訊當中的 consumer 的傳送以及接收原理。通過叢集容錯最終選擇一個合適的 Invoke 通過 netty 直聯呼叫 provider 的服務。眾所周知, netty 是基於 Java Nio 的 Reactor 模型的非同步

Dubbo原始碼分析:RPC協議實現-服務端併發控制Semaphore訊號量

概述 Dubbo支援在服務端通過在service或者method,通過executes引數設定每個方法,允許併發呼叫的最大執行緒數,即在任何時刻,只允許executes個執行緒同時呼叫該方法,超過的則拋異常返回,從而對提供者服務進行併發控制,保護資源。 用法 服務級別 限

Dubbo原始碼分析:RPC協議實現-RPC過程核心介面設計

RPC的基本過程 提供者Provider:提供服務的介面定義和介面的具體實現,然後通過URL的方式告訴消費者,某個URL對應某個service實現,一般是將服務的資訊註冊到一個註冊中心,如zookeeper或者Redis等; 消費者Consumer:獲取提供者的介面定義

dubbo原始碼分析-RPC遠端呼叫模組Remoting通訊模組協作細節

閱讀需要技能: 1.設計模式:理解代理模式。JDK動態代理類的使用。 2.設計模式:理解裝飾模式。 3.Netty網路通訊程式設計,server,handler,channel 4.瞭解Dubbo基本原理,Dubbo模組各層分包關係

dubbo原始碼分析-consumer端3-Invoker建立流程

        從前面一篇建立註冊中心的流程當中,我們知道在從註冊中心獲取到provider的連線資訊後,會通過連線建立Invoker。程式碼見com.alibaba.dubbo.registry.integration.RegistryDirectory的toInvoke

dubbo原始碼分析-consumer端1-consumer代理生成

        dubbo(官網地址)是一個分散式服務框架,致力於提供高效能和透明化的RPC遠端服務呼叫方案,是阿里巴巴SOA服務化治理方案的核心框架。目前,阿里巴巴內部已經不再使用dubbo,但對很對未到一定量級的公司來說,dubbo依然是一個很好的選擇。        

Dubbo原始碼分析Dubbo自己實現的IOC

  在建立自適應例項時,都會呼叫ExtensionLoader的injectExtension方法: @SuppressWarnings("unchecked") private T createAdaptiveExtension() { try {

Dubbo 原始碼分析 - 服務匯出全過程解析

1.服務匯出過程 本篇文章,我們來研究一下 Dubbo 匯出服務的過程。Dubbo 服務匯出過程始於 Spring 容器釋出重新整理事件,Dubbo 在接收到事件後,會立即執行服務匯出邏輯。整個邏輯大致可分為三個部分,第一是前置工作,主要用於檢查引數,組裝 URL。第二是匯出服務,包含匯出服務到本地 (JV

Dubbo原始碼解析之服務釋出註冊

準備 dubbo版本:2.5.4 Spring自定義擴充套件 dubbo 是基於 spring 配置來實現服務釋出,並基於 spring 的擴充套件機制定義了一套自定義標籤,要實現自定義擴充套件, spring 中提供了 NamespaceHandler 、BeanDefinit

zigbee 之ZStack-2.5.1a原始碼分析(二) 無線接收控制LED

本文描述ZStack-2.5.1a 模板及無線接收移植相關內容。 main HAL_BOARD_INIT // HAL_TURN_OFF_LED1 InitBoard HalDriverInit HalAdcInit

Dubbo原始碼分析(六)Dubbo通訊的編碼解碼機制

Dubbo原始碼分析(一)Dubbo的擴充套件點機制 Dubbo原始碼分析(二)Dubbo服務釋出Export Dubbo原始碼分析(三)Dubbo的服務引用Refer Dubbo原始碼分析(四)Dubbo呼叫鏈-消費端(叢集容錯機制) Dubbo原始碼分析(五)Dubbo呼叫鏈-服務端

Dubbo 原始碼分析系列之三 —— 架構原理

1 核心功能 首先要了解Dubbo提供的三大核心功能: Remoting:遠端通訊 提供對多種NIO框架抽象封裝,包括“同步轉非同步”和“請求-響應”模式的資訊交換方式。 Cluster: 服務框架 提供基於介面方法的透明遠端過程呼叫,包括多協議支援,以及

tomcat 9.0.11 原始碼分析--總結startup.shcatalina.sh

startup.sh與catalina.sh只是啟動指令碼,通過環境變數與指令碼所在目錄確定java的啟動引數來正確啟動tomcat, 從引數可以找到啟動類是org.apache.catalina.startup.Bootstrap裡的main方法 指令碼中常用判斷 if [ -z

Dubbo 原始碼分析 - 自適應北京PK10原始碼出售拓展原理

1.原理我在上一篇文章北京PK10原始碼出售 QQ2952777280【話仙原始碼論壇】hxforum.com 分析了 Dubbo 的 SPI 機制,Dubbo SPI 是 Dubbo 框架的核心。Dubbo 中的很多拓展都是通過 SPI 機制進行載入的,比如 Protocol、Cluster、LoadBal

Dubbo 原始碼分析 - 服務引用

1. 簡介 在上一篇文章中,我詳細的分析了服務匯出的原理。本篇文章我們趁熱打鐵,繼續分析服務引用的原理。在 Dubbo 中,我們可以通過兩種方式引用遠端服務。第一種是使用服務直聯的方式引用服務,第二種方式是基於註冊中心進行引用。服務直聯的方式僅適合在除錯或測試服務的場景下使用,不適合在線上環境使用。因此,本