1. 程式人生 > >SOFA 源碼分析 —— 服務發布過程

SOFA 源碼分析 —— 服務發布過程

發布服務 provider 類加載 AD erb ldif api 地方 rap

技術分享圖片

前言

SOFA 包含了 RPC 框架,底層通信框架是 bolt ,基於 Netty 4,今天將通過 SOFA—RPC 源碼中的例子,看看他是如何發布一個服務的。

示例代碼

下面的代碼在 com.alipay.sofa.rpc.quickstart.QuickStartServer 類下。

ServerConfig serverConfig = new ServerConfig()
    .setProtocol("bolt") // 設置一個協議,默認bolt
    .setPort(9696) // 設置一個端口,默認12200
    .setDaemon(false); // 非守護線程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>() .setInterfaceId(HelloService.class.getName()) // 指定接口 .setRef(new HelloServiceImpl()) // 指定實現 .setServer(serverConfig); // 指定服務端 providerConfig.export(); // 發布服務

首先,創建一個 ServerConfig ,包含了端口,協議等基礎信息,當然,這些都是手動設定的,在該類加載的時候,會自動加載很多配置文件中的服務器默認配置。比如 RpcConfigs 類,RpcRuntimeContext 上下文等。

然後呢,創建一個 ProviderConfig,也是個 config,不過多繼承了一個 AbstractInterfaceConfig 抽象類,該類是接口級別的配置,而 ServerConfig 是 服務器級別的配置。雖然都繼承了 AbstractIdConfig。

ProviderConfig 包含了接口名稱,接口指定實現類,還有服務器的配置。

最後,ProviderConfig 調用 export 發布服務。

展示給我的 API 很簡單,但內部是如何實現的呢?

在看源碼之前,我們思考一下:如果我們自己來實現,怎麽弄?

RPC 框架簡單一點來說,就是使用動態代理和 Socket。

SOFA 使用 Netty 來做網絡通信框架,我們之前也寫過一個簡單的 Netty RPC,主要是通過 handler 的 channelRead 方法來實現。

SOFA 是這麽操作的嗎?

一起來看看。

# 源碼分析

上面的示例代碼其實就是 3 個步驟,創建 ServerConfig,創建 ProviderConfig,調用 export 方法。

先看第一步,還是有點意思的。

雖然是空構造方法,但 ServerConfig 的屬性都是自動初始化的,而他的父類 AbstractIdConfig 更有意思了,父類有 1 個地方值得註意:

static {
    RpcRuntimeContext.now();
}

熟悉類加載的同學都知道,這是為了主動加載 RpcRuntimeContext ,看名字是 RPC 運行時上下文,所謂上下文,大約就是我們人類聊天中的 "老地方" 的意思。

這個上下文會在靜態塊中加載 Module(基於擴展點實現),註冊 JVM 關閉鉤子(類似 Tomcat)。還有很多配置信息。

然後呢?創建 ProviderConfig 對象。這個類比上面的那個類多繼承了一個 AbstractInterfaceConfig,接口級別的配置。比如有些方法我不想發布啊,比如權重啊,比如超時啊,比如具體的實現類啊等等,當然還需要一個 ServerConfig 的屬性(註冊到 Server 中啊餵)。

最後就是發布了。export 方法。

ProviderCofing 擁有一個 export 方法,但並不是直接就在這裏發布的,因為他是一個 config,不適合在config 裏面做這些事情,違背單一職責。

SOFA 使用了一個 Bootstrap 類來進行操作。和大部分服務器類似,這裏就是啟動服務器的地方。因為這個類會多線程使用,比如並發的發布服務。而不是一個一個慢慢的發布服務。所以他不是單例的,而是和 Config 一起使用的,並緩存在 map 中。

ProviderBootstrap 目前有 3 個實現:Rest,Bolt,Dubbo。Bolt 是他的默認實現。

export 方法默認有個實現(Dubbo 的話就要重寫了)。主要邏輯是執行 doExport 方法,其中包括延遲加載邏輯。

而 doExport 方法中,就是 SOFA 發布服務的邏輯所在了。

樓主將方法的異常處理邏輯去除,整體如下:

 private void doExport() {
        if (exported) {
            return;
        }
        String key = providerConfig.buildKey();
        String appName = providerConfig.getAppName();
        // 檢查參數
        checkParameters();
        // 註意同一interface,同一uniqleId,不同server情況
        AtomicInteger cnt = EXPORTED_KEYS.get(key); // 計數器
        if (cnt == null) { // 沒有發布過
            cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        int maxProxyCount = providerConfig.getRepeatedExportLimit();
        if (maxProxyCount > 0) {
          // 超過最大數量,直接拋出異常
        }
        // 構造請求調用器
        providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
        // 初始化註冊中心
        if (providerConfig.isRegister()) {
            List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
            if (CommonUtils.isNotEmpty(registryConfigs)) {
                for (RegistryConfig registryConfig : registryConfigs) {
                    RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                }
            }
        }
        // 將處理器註冊到server
        List<ServerConfig> serverConfigs = providerConfig.getServer();
        for (ServerConfig serverConfig : serverConfigs) {
            Server server = serverConfig.buildIfAbsent();
            // 註冊序列化接口
            server.registerProcessor(providerConfig, providerProxyInvoker);
            if (serverConfig.isAutoStart()) {
                server.start();
            }
        }

        // 註冊到註冊中心
        providerConfig.setConfigListener(new ProviderAttributeListener());
        register();

        // 記錄一些緩存數據
        RpcRuntimeContext.cacheProviderConfig(this);
        exported = true;
    }

主要邏輯如下:

  1. 根據 providerConfig 創建一個 key 和 AppName。
  2. 檢驗同一個服務多次發布的次數。
  3. 創建一個 ProviderProxyInvoker, 其中包含了過濾器鏈,而過濾器鏈的最後一鏈就是對接口實現類的調用。
  4. 初始化註冊中心,創建 Server(會有多個Server,因為可能配置了多個協議)。
  5. 將 config 和 invoker 註冊到 Server 中。內部是將其放進了一個 Map 中。
  6. 啟動 Server。啟動 Server 其實就是啟動 Netty 服務,並創建一個 RpcHandler,也就是 Netty 的 Handler,這個 RpcHandler 內部含有一個數據結構,包含接口級別的 invoker。所以,當請求進入的時候,RpcHandler 的 channelRead 方法會被調用,然後間接的調用 invoker 方法。
  7. 成功啟動後,註冊到註冊中心。將數據緩存到 RpcRuntimeContext 的一個 Set 中。

一起來詳細看看。

Invoker 怎麽構造的?很簡單,最主要的就是過濾器。關於過濾器,我們之前已經寫過一篇文章了。不再贅述。

關鍵看看 Server 是如何構造的。

關鍵代碼 serverConfig.buildIfAbsent(),類似 HashMap 的 putIfAbsent。如果不存在就創建。

Server 接口目前有 2 個實現,bolt 和 rest。當然,Server 也是基於擴展的,所以,不用怕,可以隨便增加實現。

關鍵代碼在 ServerFactory 的 getServer 中,其中會獲取擴展點的 Server,然後,執行 Server 的 init 方法,我們看看默認 bolt 的 init 方法。

    @Override
    public void init(ServerConfig serverConfig) {
        this.serverConfig = serverConfig;
        // 啟動線程池
        bizThreadPool = initThreadPool(serverConfig);
        boltServerProcessor = new BoltServerProcessor(this);
    }

保存了 serverConfig 的引用,啟動了一個業務線程池,創建了一個 BoltServerProcessor 對象。

第一:這個線程池會在 Bolt 的 RpcHandler 中被使用,也就是說,復雜業務都是在這個線程池執行,不會影響 Netty 的 IO 線程。

第二:BoltServerProcessor 非常重要,他的構造方法包括了當前的 BoltServer,所以他倆是互相依賴的。關鍵點來了:

BoltServerProcessor 實現了 UserProcessor 接口,而 Bolt 的 RpcHandler 持有一個 Map<String, UserProcessor<?>>,所以,當 RpcHandler 被執行 channelRead 方法的時候,一定會根據接口名稱找到對應的 UserProcessor,並執行他的 handlerRequest 方法。

那麽,RpcHandler 是什麽時候創建並放置到 RpcHandler 中的呢?

具體是這樣的:在 server.start() 執行的時候,該方法會初始化 Netty 的 Server,在 SOFA 中,叫 RpcServer,將 BoltServerProcessor 放置到名叫 userProcessors 的 Map 中。然後,當 RpcServer 啟動的時候,也就是 start 方法,會執行一個 init 方法,該方法內部就是設置 Netty 各種屬性的地方,包括 Hander,其中有 2 行代碼對我們很重要:

final RpcHandler rpcHandler = new RpcHandler(true, this.userProcessors);
pipeline.addLast("handler", rpcHandler);

創建了一個 RpcHandler,並添加到 pipeline 中,這個 Handler 的構造參數就是包含所有 BoltServerProcessor 的 Map。

所以,總的流程就是:

每個接口都會創建一個 providerConfig 對象,這個對象會創建對應的 invoker 對象(包含過濾器鏈),這兩個對象都會放到 BoltServer 的 invokerMap 中,而 BoltServer 還包含其他對象,比如 BoltServerProcessor(繼承 UserProcessor), RpcServer(依賴 RpcHandler)。當初始化 BoltServerProcessor 的時候,會傳入 this(BoltServer),當初始化 RpcServer 的時候,會傳入 BoltServerProcessor 到 RpcServer 的 Map 中。在 RpcHandler 初始化的時候,又會將 RpcServer 的 Map 傳進自己的內部。完成最終的依賴。
當請求進入,RpcHandler 調用對應的 UserProcessor 的 handlerRequest 方法,而該方法中,會調用對應的 invoker,invoker 調用過濾器鏈,知道調用真正的實現類。

而大概的 UML 圖就是下面這樣的:

技術分享圖片

紅色部分是 RPC 的核心,包含 Solt 的 Server,實現 UserProcessor 接口的 BoltServerProcessor,業務線程池,存儲所有接口實現的 Map。

綠色部分是 Bolt 的接口和類,只要實現了 UserProcessor 接口,就能將具體實現替換,也既是處理具體數據的邏輯。

最後,看看關鍵類 BoltServerProcessor ,他是融合 RPC 和 Bolt 的膠水類。

該類會註冊一個序列化器替代 Bolt 默認的。handleRequest 方法是這個類的核心方法。有很多邏輯,主要看這裏:

// 查找服務
Invoker invoker = boltServer.findInvoker(serviceName);
// 真正調用
response = doInvoke(serviceName, invoker, request);

/**
 * 找到服務端Invoker
 *
 * @param serviceName 服務名
 * @return Invoker對象
 */
public Invoker findInvoker(String serviceName) {
    return invokerMap.get(serviceName);
}

根據服務名稱,從 Map 中找到服務,然後調用 invoker 的 invoker 方法。

再看看 Netty 到 BoltServerProcessor 的 handlerRequest 的調用鏈,使用 IDEA 的 Hierarchy 功能,查看該方法,最後停留在 ProcessTast 中,一個 Runnable.

技術分享圖片

根據經驗,這個類肯定是被放到線程池了。什麽時候放的呢?看看他的構造方法的 Hierarchy。

技術分享圖片

從圖中可以看到 ,Bolt 的 RpcHandler 的 channelRead 最終會調用 ProcessTask 的 構造方法。

那麽 BoltServer 的用戶線程池什麽時候使用呢?還是使用 IDEA 的 Hierarchy 功能。

技術分享圖片

其實也是在這個過程中,當用戶沒有設置線程池,則使用系統線程池。

總結

好了,關於 SOFA 的服務發布和服務的接收過程,就介紹完了,可以說,整個框架還是非常輕量級的。基本操作就是:內部通過在 Netty的 Handler 中保存一個存儲服務實現的 Map 完成遠程調用。

其實和我們之前用 Netty 寫的小 demo 類似。

SOFA 源碼分析 —— 服務發布過程