1. 程式人生 > >Dubbo原始碼解析(十一) Dubbo Exchanger

Dubbo原始碼解析(十一) Dubbo Exchanger

先看一下Exchanger的介面定義,就是bindconnectbind是服務端呼叫的,繫結一個埠用來接收客戶端的請求。connect是作為一個客戶端去連線服務端,進行和服務端交換。

@SPI(HeaderExchanger.NAME)
public interface Exchanger {

    /**
     * bind.
     *
     * @param url
     * @param handler
     * @return message server
     */
    @Adaptive({Constants.EXCHANGER_KEY})
    ExchangeServer bind(URL url, ExchangeHandler handler) throws
RemotingException; /** * connect. * * @param url * @param handler * @return message channel */ @Adaptive({Constants.EXCHANGER_KEY}) ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException; }

那麼Exchanger是被誰呼叫了?
這裡寫圖片描述
可以看到主要是被Exchangers

這個工具類呼叫了,那麼這個工具類具體做了什麼事情
這裡寫圖片描述
就是bindconnect方法以及建立Exchanger的功能

建立Exchanger

    public static Exchanger getExchanger(String type) {
        return ExtensionLoader.getExtensionLoader(Exchanger.class).getExtension(type);
    }

bind

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws
RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handler == null) { throw new IllegalArgumentException("handler == null"); } url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange"); return getExchanger(url).bind(url, handler); }

connect方法

    public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        return getExchanger(url).connect(url, handler);
    }

可以看到 邏輯性不是很強,但是這個分層到底是為了什麼?直接在Protocol釋出/引用服務的時候建立一個Exchanger不就好了嗎?

為什麼要抽象一個工具類呢?

這個後面再思考思考,什麼時候寫公共類,什麼時候寫這種工具類

先聊聊Exchanger的作用是什麼?

exchange 資訊交換層:封裝請求響應模式,同步轉非同步,以 Request, Response 為中心,擴充套件介面為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

RequestResponse,很熟悉,和Servlet是不是很像,那麼這一層應該是類似於Tomcat接收外界的請求,封裝為RequestResponse進行封裝與轉發,作為內部與外部連線的轉接層。確實是哦,Protocol這一層主要是進行服務的釋出與引用,相當於內部的處理都做好了,但是現在與外部個還沒有連線,所以對於injvm這些不需要與外界進行溝通的協議或者如rmi這種有自己格式固定的通訊並有相應的支支撐的協議,就不需要這個交換層了。

Dubbo裡面只有一個HeaderExchanger的預設實現,對心跳的功能的包裝

public class HeaderExchanger implements Exchanger {

    public static final String NAME = "header";

    public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
        // 開啟一個Transporters的客戶端
        // 解碼 -> 心跳handler包裝原始的handler    裝飾者模式 哇哇哇哇
        return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
    }

    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        // 開啟一個Transporters的服務端
        return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

}

建立一個交換層的客戶端,也就是ReferenceBean的訊息傳送者

    public HeaderExchangeClient(Client client, boolean needHeartbeat) {
        if (client == null) {
            throw new IllegalArgumentException("client == null");
        }
        this.client = client;
        this.channel = new HeaderExchangeChannel(client);
        // 獲取dubbo協議版本
        String dubbo = client.getUrl().getParameter(Constants.DUBBO_VERSION_KEY);
        this.heartbeat = client.getUrl().getParameter(Constants.HEARTBEAT_KEY, dubbo != null && dubbo.startsWith("1.0.") ? Constants.DEFAULT_HEARTBEAT : 0);
        // 獲取心跳超時時間
        this.heartbeatTimeout = client.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        // 如果心跳超時小於兩個心跳間隔的時間 
        if (heartbeatTimeout < heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout < heartbeatInterval * 2");
        }
        // 如果需要傳送心跳
        if (needHeartbeat) {
            startHeartbeatTimer();
        }
    }

如何傳送心跳的

  1. 停止之前的心跳傳送器
  2. 啟動一個新的心跳
    通過 ScheduledThreadPoolExecutor 定時任務執行緒池
    // 過heartbeat時間後開始沒heartbeat傳送心跳
    private void startHeartbeatTimer() {
        stopHeartbeatTimer();
        if (heartbeat > 0) {
            heartbeatTimer = scheduled.scheduleWithFixedDelay(
            heartBeatTask,
            heartbeat, 
            heartbeat, 
            TimeUnit.MILLISECONDS);
        }
    }

heartBeatTask 心跳任務

    new HeartBeatTask(new HeartBeatTask.ChannelProvider() {
        public Collection<Channel> getChannels() {
            return Collections.<Channel>singletonList(HeaderExchangeClient.this);
        }
    }, heartbeat, heartbeatTimeout);

HeaderExchangeServer 也是類似的功能,不一樣的地方就是裡面的屬性serverclient,這些都是Transport這裡面的,後面還是要梳理梳理整個dubbo的流程
且聽下回分解

相關推薦

Dubbo原始碼解析 Dubbo Exchanger

先看一下Exchanger的介面定義,就是bind和connect ,bind是服務端呼叫的,繫結一個埠用來接收客戶端的請求。connect是作為一個客戶端去連線服務端,進行和服務端交換。 @SP

Spring原始碼解析——AOP原理——demo

1.業務類 public class MathCalculator { public int div(int i, int j) { System.out.println("MathCalculator---div"); return i / j;

jdk原始碼解析——Java記憶體模型與執行緒

前面我們瞭解了Java的編譯和執行,這裡在講解一下高效併發(Java記憶體模型與執行緒)在瞭解記憶體模型與執行緒之前,我們先要了解一些東西。 1 硬體效率與一致性  計算併發執行的執行和充分利用計算機處理器的效能兩者看來是互為因果的,而在大多數的時候,計算機的處理速度不止是在處理器

Mybaits 原始碼解析 ----- 設計模式精妙使用:靜態代理和動態代理結合使用:@MapperScan將Mapper介面生成代理注入到Spring

上一篇文章我們講了SqlSessionFactoryBean,通過這個FactoryBean建立SqlSessionFactory並註冊進Spring容器,這篇文章我們就講剩下的部分,通過MapperScannerConfigurer將Mapper介面生成代理注入到Spring 掃描Mapper介面 我們上一

微服務框架Dubbo呼叫攔截及引數校檢擴充套件

  此係列文章將會描述Java框架Spring Boot、服務治理框架Dubbo、應用容器引擎Docker,及使用Spring Boot整合Dubbo、Mybatis等開源框架,其中穿插著Spring Boot中日誌切面等技術的實現,然後通過gitlab-CI以

Spring原始碼解析——AOP原理——獲取攔截器鏈——MethodInterceptor

   *     3)、目標方法執行    ;  *         容器中儲存了元件的代理物件(cglib增強後的物件),這個物件裡面儲存了詳細

Spring原始碼解析——AOP原理——建立aop代理

   * AnnotationAwareAspectJAutoProxyCreator【InstantiationAwareBeanPostProcessor】    的作用:  * 1)、每一個bean建立之前,呼叫postProce

Spring原始碼解析——AOP原理——AnnotationAwareAspectJAutoProxyCreator執行時機

     *             AnnotationAwareAspectJAutoProxyCreator => InstantiationAwareBean

Spring原始碼解析——AOP原理——@EnableAspectJAutoProxy

一、@EnableAspectJAutoProxy 第一步:註冊AnnotationAwareAspectJAutoProxyCreator 把AnnotationAwareAspectJAutoProxyCreator建立為RootBeanDefinition,加入

yocto-sumo源碼解析: recvfds

socket sum arr bsp cti 不為 runt data eve def recvfds(sock, size): ‘‘‘Receive an array of fds over an AF_UNIX socket.‘‘‘ a

jdk原始碼解析——執行緒安全與鎖優化

上一節我們說了Java記憶體模型與執行緒、那麼我們這節來了解一下執行緒安全與鎖優化 1 概述 在軟體業發展的初期,程式編寫都是以演算法為核心的,程式設計師會把資料和過程分別作為獨立的部分來考慮,資料代表問題空間中的客體,程式程式碼則用於處理這些資料,這種思維方式直接站在計算機的角度去抽象問題

Android原始碼解析-->Dialog載入繪製流程

前面兩篇文章,我們分析了Activity的佈局檔案載入、繪製流程,算是對整個Android系統中介面的顯示流程有了一個大概的瞭解,其實Android系統中所有的顯示控制元件(注意這裡是控制元件,而不是元件)的載入繪製流程都是類似的,包括:Dialog的載入繪

Ethercat解析之分佈時鐘

驅動程式碼中,同步時鐘涉及到如下幾個概念: ⑴ 本地時鐘:每一個支援DC的從站都有一個納秒級解析度的本地時鐘暫存器。 每次從機上電,則暫存器從0開始計時,這就意味著不同的從機因為上電開機的時間不同而本地時鐘也會有差異,所以需要對從機的本地時鐘對比參考時鐘

scrapy原始碼分析----------下載器Downloader

經過前面幾篇的分析,scrapy的五大核心元件已經介紹了4個:engine,scheduler,scraper,spidemw。 還剩最後一個downloader,這個下載器關係到了網頁如何下載,內容相對來說是最為複雜的一部分,這篇教程就逐步分析其原始碼。 下載操作開始於

Github專案解析-->一個簡單,強大的自定義廣告活動彈窗

上一篇文章中講解了我最近寫的一個快速整合二維碼掃描庫,其核心的實現掃描的功能,是通過呼叫ZXing庫實現的。由於在實現二維碼掃描功能的時候發現整合二維碼掃描功能並不是特別方便,於是有了將其製作成標準庫的想法,這個二維碼庫能夠快速,方便的整合二維碼掃描功能,專

HAWQ技術解析 —— 資料管理

一、基本操作1. INSERT        在常用的增刪改查資料庫操作中,HAWQ僅支援INSERT和SELECT兩種,不支援UPDATE和DELETE,這主要是因為HDFS是一個只能追加資料而不能更新的檔案系統。SELECT語句最熟悉不過,它應該是資料庫中最常用的語句了,

Netty原始碼分析 ----- 拆包器之LengthFieldBasedFrameDecoder

本篇文章主要是介紹使用LengthFieldBasedFrameDecoder解碼器自定義協議。通常,協議的格式如下: LengthFieldBasedFrameDecoder是netty解決拆包粘包問題的一個重要的類,主要結構就是header+body結構。我們只需要傳入正確的引數就可以傳送和接收正確

Mybaits 原始碼解析 ----- Mybatis的事務如何被Spring管理?Mybatis和Spring事務中用的Connection是同一個嗎?

不知道一些同學有沒有這種疑問,為什麼Mybtis中要配置dataSource,Spring的事務中也要配置dataSource?那麼Mybatis和Spring事務中用的Connection是同一個嗎?我們常用配置如下 <!--會話工廠 --> <bean id="sqlSessionFa

深度學習論文翻譯解析:OverFeat: Integrated Recognition, Localization and Detection using Convolutional Networks

論文標題:OverFeat: Integrated Recognition, Localization and Detection using Convolutional Networks     標題翻譯:OverFeat:使用卷積神經網路整合識別,定位和檢測 論文作者:Pierre Sermanet&nb

Dubbo原始碼解析請求排程器 Dispatcher

排程器 Dispatcher 排程策略 all 所有訊息都派發到執行緒池,包括請求,響應,連線事件,斷開事件,心跳等。 direct 所有訊息都不派發到執行緒池,全部在 IO 執行緒上直接執行。 message 只有請求響應訊息派發到執行緒池,其它連線斷開