1. 程式人生 > >Dubbo(2)--Dubbo常用配置文件解析及核心源碼閱讀

Dubbo(2)--Dubbo常用配置文件解析及核心源碼閱讀

proto println on() 下使用 流量 key logger 不一定 適應

1.多版本支持

服務端

創建第二個接口實現類

package com.lf;

public class HelloImpl2 implements IHello{

    @Override
    public String sayHello(String msg) {
        return "hello,version 2.0"+ msg;
    }

}

修改配置:添加接口實現的版本號信息

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://code.alibabatech.com/schema/dubbo
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <!--提供方信息-->
 	<dubbo:application name="dubbo-server" owner="lf"/>
    <!--註冊中心192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181 file="f:/dubbo-server"-->
    <dubbo:registry address="zookeeper://192.168.25.131:2181"/> 
 
    <!--配置需要發布的協議及端口-->
 	<dubbo:protocol port="20880" name="dubbo"></dubbo:protocol>
    <!--需要發布的服務-->
    <dubbo:service interface="com.lf.IHello"
                   ref="gpHelloService" protocol="dubbo" version=‘1.0.0‘
/> <dubbo:service interface="com.lf.IHello" ref="gpHelloService2" protocol="dubbo" version="1.0.1"/> <!--需要發布的服務實現類--> <bean id="gpHelloService" class="com.lf.HelloImpl"/> <bean id="gpHelloService2" class="com.lf.HelloImpl2"/> </beans>

  啟動服務,註冊節點:

package com.lf;

import java.io.IOException;

import org.springframework.context.support.ClassPathXmlApplicationContext;

import com.alibaba.dubbo.container.Main;

public class BootStrap {
	
	public static void main(String[] args) throws IOException {
		ClassPathXmlApplicationContext classPathXmlApplicationContext =new  ClassPathXmlApplicationContext("META-INF/spring/dubbo-server.xml"); 
		classPathXmlApplicationContext.start();
		System.in.read();//阻塞當前線程
		//Main.main(new String[]{"spring","log4j"});//基於zookeeper內置容器啟動
	}
}

查看zookeeper節點信息:

dubbo%3A%2F%2F169.254.227.106%3A20880%2Fcom.lf.IHello%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.lf.IHello%26methods%3DsayHello%26owner%3Dlf%26pid%3D1416%26revision%3D1.0.0%26side%3Dprovider%26timestamp%3D1553606947565%26version%3D1.0.0, dubbo%3A%2F%2F169.254.227.106%3A20880%2Fcom.lf.IHello2%3Fanyhost%3Dtrue%26application%3Ddubbo-server%26dubbo%3D2.5.3%26interface%3Dcom.lf.IHello%26methods%3DsayHello%26owner%3Dlf%26pid%3D1108%26revision%3D1.0.1%26side%3Dprovider%26timestamp%3D1553606891782%26version%3D1.0.1

客戶端:

  配置信息添加版本號,獲取對應版本號的接口信息:

  

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://code.alibabatech.com/schema/dubbo
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <!--提供方信息 -->
    <dubbo:application name="dubbo-server" owner="lf" />
    <!--註冊中心192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181 file="f:/dubbo-server" -->
    <dubbo:registry address="zookeeper://192.168.25.131:2181" />

    <!--配置需要發布的協議及端口 -->
    <dubbo:protocol port="20880" name="dubbo"></dubbo:protocol>

    <dubbo:reference id="gpHelloService" interface="com.lf.IHello"
        protocol="dubbo" version =1.0.1/><!--這裏的protocol可以自己指定需要的協議 -->
</beans>

運行:

  

package com.lf;

import org.springframework.context.support.ClassPathXmlApplicationContext;

public class AppVersion {
	
	public static void main(String[] args) throws Exception {
		 ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("META-INF/spring/dubbo-client-version.xml");
		 
	        //得到IGpHello的遠程代理對象
	        IHello iGpHello = (IHello) context.getBean("gpHelloService");
	 
	        System.out.println(iGpHello.sayHello("lf"));
	        Thread.sleep(1000);
	        System.in.read();
	    }
	}

 結果:hello,version 2.0lf

修改上述配置為 1.0.0,則運行結果為:hellolf

若不指定版本信息,因服務端配置信息中都含有版本號,dubbo根據url獲取服務的,則會報錯

  2.主機綁定

發布一個Dubbo服務的時候,會生成一個dubbo://ip:port的協議地址,那麽這個IP是根據什麽生成的呢?可以在ServiceConfig.java代碼中 doExportUrlsFor1Protocol 方法找到如下代碼;可以發現,在生成綁定的主機的時候,會通過一層一層的判斷,直到獲取到合法的ip地址。以下源碼就是綁定 host 的判斷邏輯過程,對於不同協議綁定不同的端口。

1.NetUtils.isInvalidLocalHost(host), 從配置文件中獲取host
2.host = InetAddress.getLocalHost().getHostAddress();
3.Socket socket = new Socket();
try {
    SocketAddress addr = new InetSocketAddress(registryURL.getHost(), registryURL.getPort());
    socket.connect(addr, 1000);
    host = socket.getLocalAddress().getHostAddress();
    break;
} finally {
    try {
        socket.close();
    } catch (Throwable e) {}
}

  4.

技術分享圖片

  3.集群容錯

什麽是容錯機制? 容錯機制指的是某種系統控制在一定範圍內的一種允許或包容犯錯情況的發生,舉個簡單例子,我們在電腦上運行一個程序,有時候會出現無響應的情況,然後系統會彈出一個提示框讓我們選擇,是立即結束還是繼續等待,然後根據我們的選擇執行對應的操作,這就是“容錯”。

  在分布式架構下,網絡、硬件、應用都可能發生故障,由於各個服務之間可能存在依賴關系,如果一條鏈路中的其中一個節點出現故障,將會導致雪崩效應。為了減少某一個節點故障的影響範圍,所以我們才需要去構建容錯服務,來優雅的處理這種中斷的響應結果。

Dubbo提供了6種容錯機制,分別如下:

1.Failover 模式:

  失敗自動切換,當出現失敗,重試其它服務器。(缺省) ,通常用於讀操作,但重試會帶來更長延遲。 可通過retries=”2”來設置重試次數(不含第一次)。

2.Failfast :

  快速失敗,只發起一次調用,失敗立即報錯。通常用於非冪等性的寫操作,比如新增記錄。

3.Failsafe :

  失敗安全,出現異常時,直接忽略。通常用於寫入審計日誌等操作。

4.Failback :

  失敗自動恢復,後臺記錄失敗請求,定時重發。 通常用於消息通知操作。

5.Forking :

  並行調用多個服務器,只要一個成功即返回。通常用於實時性要求較高的讀操作,但需要浪費更多服務資源。 可通過forks=”2”來設置最大並行數。

6.Broadcast :

  廣播調用所有提供者,逐個調用,任意一臺報錯則報錯。(2.1.0開始支持) 通常用於通知所有提供者更新緩存或日誌等本地資源信息。

client 配置方式如下,通過cluster方式,配置指定的容錯方案:

  

<dubbo:reference id="gpHelloService"
                     interface="com.gupaoedu.dubbo.IGpHello"
                     protocol="dubbo" version="2.0.0"
                     cluster="failsafe"/><!--失敗安全,可以認為是把錯誤吞掉(記錄日誌)-->

  

  配置的優先級別:如果現在客戶端服務端同時配置了超時時間,但是數值不一樣,這個時候是客戶端優於服務端,在客戶端可以配置到細粒度的點可以配置指定服務的方法參數。方法級別優先,然後是接口,然後是全局配置。如果配置級別一樣,這個時候客戶端優先。retires、cluster、LoadBalance(最好配置在客戶端) timeout(配置在服務端比較好,客戶端也可以配置)

4.服務降級

降級的目的是為了保證核心服務可用。

降級可以有幾個層面的分類: 自動降級和人工降級; 按照功能可以分為:讀服務降級和寫服務降級;

1. 對一些非核心服務進行人工降級,在大促之前通過降級開關關閉哪些推薦內容、評價等對主流程沒有影響的功能

2. 故障降級,比如調用的遠程服務掛了,網絡故障、或者RPC服務返回異常。 那麽可以直接降級,降級的方案比如設置默認值、采用兜底數據(系統推薦的行為廣告掛了,可以提前準備靜態頁面做返回)等等

3. 限流降級,在秒殺這種流量比較集中並且流量特別大的情況下,因為突發訪問量特別大可能會導致系統支撐不了。這個時候可以采用限流來限制訪問量。當達到閥值時,後續的請求被降級,比如進入排隊頁面,比如跳轉到錯誤頁(活動太火爆,稍後重試等)

dubbo的降級方式: Mock

實現步驟:

1. 在client端創建一個TestMock類,實現對應IGpHello的接口(需要對哪個接口進行mock,就實現哪個),名稱必須以Mock結尾

  

package com.lf;

public class TestMock implements IHello{

	@Override
	public String sayHello(String msg) {
		
		return "系統繁忙"+msg;
	}

}

  

2. 在client端的xml配置文件中,添加如下配置,增加一個mock屬性指向創建的TestMock

3. 模擬錯誤(設置timeout=‘1‘),模擬超時異常,運行測試代碼即可訪問到TestMock這個類。當服務端故障解除以後,調用過程將恢復正常,

  配置如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
       xsi:schemaLocation="http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://code.alibabatech.com/schema/dubbo
            http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    <!--提供方信息-->
 	<dubbo:application name="dubbo-server" owner="lf"/>
    <!--註冊中心192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181 file="f:/dubbo-server"-->
    <dubbo:registry id="zk1" address="zookeeper://192.168.25.131:2181"/> 
 
    <!--配置需要發布的協議及端口-->
 	<dubbo:protocol port="20880" name="dubbo"></dubbo:protocol>
 	<!-- cluster="failsafe" timeout="50" || cluster="failsafe" timeout="1"-->
 	<dubbo:reference id="gpHelloService"
                     interface="com.lf.IHello"
                     protocol="dubbo"  timeout="1" mock="com.lf.TestMock"/>
</beans>

  運行結果:報錯,輸出:系統繁忙lf

  運行結果:輸出:hello,version 2.0lf

  運行結果:輸出:null (因cluster="failsafe" 采用了服務降級策略,吞掉這個錯誤,不會報錯


配置優先級別

  • timeout為例,顯示了配置的查找順序,其它retries, loadbalance等類似。
  1. 方法級優先,接口級次之,全局配置再次之。
  2. 如果級別一樣,則消費方優先,提供方次之。

其中,服務提供方配置,通過URL經由註冊中心傳遞給消費方。

建議由服務提供方設置超時,因為一個方法需要執行多長時間,服務提供方更清楚,如果一個消費方同時引用多個服務,就不需要關心每個服務的超時設置。

5.關於 Dubbo SPI:

  在Dubbo中,SPI是一個非常核心的機制,貫穿在幾乎所有的流程中。Dubbo是基於Java原生SPI機制思想的一個改進,所以,先從JAVA SPI機制開始了解什麽是SPI以後再去學習Dubbo的SPI,就比較容易了

  關於JAVA 的SPI機制:

  SPI全稱(service provider interface),是JDK內置的一種服務提供發現機制,目前市面上有很多框架都是用它來做服務的擴展發現,大家耳熟能詳的如JDBC、日誌框架都有用到;簡單來說,它是一種動態替換發現的機制。舉個簡單的例子,如果我們定義了一個規範,需要第三方廠商去實現,那麽對於我們應用方來說,只需要集成對應廠商的插件,既可以完成對應規範的實現機制。 形成一種插拔式的擴展手段。

  SPI的實際應用

  SPI在很多地方有應用,大家可以看看最常用的java.sql.Driver驅動。JDK官方提供了java.sql.Driver這個驅動擴展點,但是你們並沒有看到JDK中有對應的Driver實現。 那在哪裏實現呢?以連接Mysql為例,我們需要添加mysql-connector-java依賴。然後,你們可以在這個jar包中找到SPI的配置信息。如下圖,所以java.sql.Driver由各個數據庫廠商自行實現。這就是SPI的實際應用。當然除了這個意外,大家在spring的包中也可以看到相應的痕跡

  創建規範接口工程並打成jar:

package com.lf.spi;

public interface DateBaseDriver {
	
	String connect(String host);
}

  創建mysql連接工程(引入接口jar):

  

package com.lf.spi;

public class MysqlDriver implements DateBaseDriver{

	@Override
	public String connect(String host) {
		// TODO Auto-generated method stub
		return "mysql connection";
	}

}

  創建service文件:

  在resources/META-INF/services創建文件並命名com.lf.spi.DateBaseDriver,內容為實現類的全路徑:com.lf.spi.MysqlDriver

  在引用工程中,引入其jar:

  創建連接類:

  

package com.lf.spi;

import java.util.ServiceLoader;

public class DateBaseConnection {
	
	public static void main(String[] args) {
		ServiceLoader<DateBaseDriver> serviceLoader =ServiceLoader.load(DateBaseDriver.class);
		for(DateBaseDriver dateBaseDriver:serviceLoader){
			System.out.println(dateBaseDriver.connect("localhost"));
		}
	}
	
}

  輸出:mysql connection

 同樣方法:創建oracle驅動類

 引入兩個jar,會輸出:

  mysql connection

oracle connection

SPI的缺點

1. JDK標準的SPI會一次性加載實例化擴展點的所有實現,什麽意思呢?就是如果你在META-INF/service下的文件裏面加了N個實現類,那麽JDK啟動的時候都會一次性全部加載。那麽如果有的擴展點實現初始化很耗時或者如果有些實現類並沒有用到,那麽會很浪費資源

2. 如果擴展點加載失敗,會導致調用方報錯,而且這個錯誤很難定位到是這個原因

Dubbo優化後的SPI實現

基於Dubbo提供的SPI規範實現自己的擴展

在了解Dubbo的SPI機制之前,先通過一段代碼初步了解Dubbo的實現方式,這樣,我們就能夠形成一個對比,得到這兩種實現方式的差異

Dubbo的SPI機制規範

大部分的思想都是和SPI是一樣,只是下面兩個地方有差異。

1. 需要在resource目錄下配置META-INF/dubbo或者META-INF/dubbo/internal或者META-INF/services,並基於SPI接口去創建一個文件

2. 文件名稱和接口名稱保持一致,文件內容和SPI有差異,內容是KEY對應Value

代碼實現:

創建擴展RPC協議接口實現類:

  

package com.lf.spi;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;

public class DubboProtocol implements Protocol{

       @Override
	public int getDefaultPort() {
		// TODO Auto-generated method stub
		return 8888;
	}    

	@Override
	public void destroy() {
		// TODO Auto-generated method stub
		
	}

	@Override
	public <T> Exporter<T> export(Invoker<T> arg0) throws RpcException {
		// TODO Auto-generated method stub
		return null;
	}

	@Override
	public <T> Invoker<T> refer(Class<T> arg0, URL arg1) throws RpcException {
		// TODO Auto-generated method stub
		return null;
	}

}
    

  配置文件:在resources/META-INFdubbo創建文件並命名com.alibaba.dubbo.rpc.Protocol,內容為實現類的全路徑:MyProtocol=com.lf.spi.DubboProtocol

  創建測試類:

public class DubboProtocolTest {

	
	public static void main(String[] args) {
		Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("MyProtocol");
		System.out.println(protocol.getDefaultPort());
	}
}

  輸出:8888

  

Dubbo SPI機制源碼閱讀

多同學在聽完我講的課程以後,不知道為什麽分析這塊代碼的源碼。其實前面有提到過,我們必須要知道,下面這兩段代碼,到底做了什麽,以及會返回一個什麽樣的結果,如果這個不清楚,後續的代碼閱讀,你就沒辦法清晰學習。

ps: 源碼閱讀,帶著疑問去了解【為什麽傳入一個myProtocol就能獲得自定義的DefineProtocol對象】、 getAdaptiveExtension是一個什麽東西?

Protocol protocol = ExtensionLoader. getExtensionLoader(Protocol.class). getExtension("myProtocol");
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();

  

源碼閱讀入口

接下來的源碼分析,是基於下面這段代碼作為入口,至於為什麽不用上面提到的第一段代碼作為入口。理由如下

  1. 在下節課講的分析dubbo源碼之服務發布過程中,會有下面這段代碼的體現。
  2. 分下完下面這段代碼,對於第一段代碼的理解,會很容易
Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();

把上面這段代碼分成兩段,一段是getExtensionLoader 另一段是getAdaptiveExtension 初步猜想一下;


第一段是通過一個Class參數去獲得一個ExtensionLoader對象,有點類似一個工廠模式。把上面這段代碼分成兩段,一段是
getExtensionLoader 另一段是getAdaptiveExtension 初步猜想一下;

第二段getAdaptiveExtension,去獲得一個自適應的擴展點

 

Extension源碼的結構

了解源碼結構,建立一個全局認識。結構圖如下

技術分享圖片

初步了解這些代碼在擴展點中的痕跡。

Protocol源碼

一下是Protocol的源碼,在這個源碼中可以看到有兩個註解,一個是在類級別上的@SPI(“dubbo”). 另一個是@Adaptive

@SPI 表示當前這個接口是一個擴展點,可以實現自己的擴展實現,默認的擴展點是DubboProtocol。

@Adaptive 表示一個自適應擴展點,在方法級別上,會動態生成一個適配器類

@SPI("dubbo")
public interface Protocol {
    
    /**
     * 獲取缺省端口,當用戶沒有配置端口時使用。
     * 
     * @return 缺省端口
     */
    int getDefaultPort();

    /**
     * 暴露遠程服務:<br>
     * 1. 協議在接收請求時,應記錄請求來源方地址信息:RpcContext.getContext().setRemoteAddress();<br>
     * 2. export()必須是冪等的,也就是暴露同一個URL的Invoker兩次,和暴露一次沒有區別。<br>
     * 3. export()傳入的Invoker由框架實現並傳入,協議不需要關心。<br>
     * 
     * @param <T> 服務的類型
     * @param invoker 服務的執行體
     * @return exporter 暴露服務的引用,用於取消暴露
     * @throws RpcException 當暴露服務出錯時拋出,比如端口已占用
     */
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;

    /**
     * 引用遠程服務:<br>
     * 1. 當用戶調用refer()所返回的Invoker對象的invoke()方法時,協議需相應執行同URL遠端export()傳入的Invoker對象的invoke()方法。<br>
     * 2. refer()返回的Invoker由協議實現,協議通常需要在此Invoker中發送遠程請求。<br>
     * 3. 當url中有設置check=false時,連接失敗不能拋出異常,並內部自動恢復。<br>
     * 
     * @param <T> 服務的類型
     * @param type 服務的類型
     * @param url 遠程服務的URL地址
     * @return invoker 服務的本地代理
     * @throws RpcException 當連接服務提供方失敗時拋出
     */
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;

    /**
     * 釋放協議:<br>
     * 1. 取消該協議所有已經暴露和引用的服務。<br>
     * 2. 釋放協議所占用的所有資源,比如連接和端口。<br>
     * 3. 協議在釋放後,依然能暴露和引用新的服務。<br>
     */
    void destroy();
}
getExtensionLoader
該方法需要一個Class類型的參數,該參數表示希望加載的擴展點類型,該參數必須是接口,且該接口必須被@SPI註解註釋,否則拒絕處理。檢查通過之後首先會檢查ExtensionLoader緩存中是否已經存在該擴展對應的ExtensionLoader,如果有則直接返回,否則創建一個新的ExtensionLoader負責加載該擴展實現,同時將其緩存起來。可以看到對於每一個擴展,dubbo中只會有一個對應的ExtensionLoader實例

@SuppressWarnings("unchecked")
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if(!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if(!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type + 
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }
    
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}
ExtensionLoader提供了一個私有的構造函數,並且在這裏面對兩個成員變量type/objectFactory進行賦值。而objectFactory賦值的意義是什麽呢?先留個懸念
private ExtensionLoader(Class<?> type) {
    this.type = type;
    objectFactory = (type == ExtensionFactory.class ? null :
            ExtensionLoader.getExtensionLoader(ExtensionFactory.class).
                    getAdaptiveExtension());
}

  

getAdaptiveExtension

通過getExtensionLoader獲得了對應的ExtensionLoader實例以後,再調用getAdaptiveExtension()方法來獲得一個自適應擴展點。接下來我們來看看代碼的實現

ps:簡單對自適應擴展點做一個解釋,大家一定了解過適配器設計模式,而這個自適應擴展點實際上就是一個適配器。

這個方法裏面主要做幾個事情:

  1. cacheAdaptiveInstance 這個內存緩存中獲得一個對象實例
  2. 如果實例為空,說明是第一次加載,則通過雙重檢查鎖的方式去創建一個適配器擴展點

   

public T getAdaptiveExtension() {
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if(createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
        else {
            throw new IllegalStateException("fail to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);
        }
    }
    return (T) instance;
}

  

createAdaptiveExtension

這段代碼裏面有兩個結構,一個是injectExtension. 另一個是getAdaptiveExtensionClass()

我們需要先去了解getAdaptiveExtensionClass這個方法做了什麽?很顯然,從後面的.newInstance來看,應該是獲得一個類並且進行實例)

private T createAdaptiveExtension() {
    try {
        //可以實現擴展點的註入
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
    }
}

  

getAdaptiveExtensionClass

從類名來看,是獲得一個適配器擴展點的類。

在這段代碼中,做了兩個事情

  1. getExtensionClasses() 加載所有路徑下的擴展點
  2. createAdaptiveExtensionClass() 動態創建一個擴展點

cachedAdaptiveClass這裏有個判斷,用來判斷當前Protocol這個擴展點是否存在一個自定義的適配器,如果有,則直接返回自定義適配器,否則,就動態創建,這個值是在getExtensionClasses中賦值的,這塊代碼我們稍後再看

private Class<?> getAdaptiveExtensionClass() {
    getExtensionClasses();
    //TODO  不一定?
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

  

createAdaptiveExtensionClass

動態生成適配器代碼,以及動態編譯

  1. createAdaptiveExtensionClassCode, 動態創建一個字節碼文件。返回code這個字符串
  2. 通過compiler.compile進行編譯(默認情況下使用的是javassist)
  3. 通過ClassLoader加載到jvm中
    //創建一個適配器擴展點。(創建一個動態的字節碼文件)
    private Class<?> createAdaptiveExtensionClass() {
        //生成字節碼代碼
        String code = createAdaptiveExtensionClassCode();
        //獲得類加載器
        ClassLoader classLoader = findClassLoader();
        com.alibaba.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();
        //動態編譯字節碼
        return compiler.compile(code, classLoader);
    }
    

    code的字節碼內容

    public class Protocol$Adaptive implements com.alibaba.dubbo.rpc.Protocol {
        public void destroy() {
            throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public int getDefaultPort() {
            throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!");
        }
    
        public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws com.alibaba.dubbo.rpc.RpcException {
            if (arg1 == null) throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg1;
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.refer(arg0, arg1);
        }
    
        public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.RpcException {
            if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null");
            if (arg0.getUrl() == null)
                throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");
            com.alibaba.dubbo.common.URL url = arg0.getUrl();
            String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
            if (extName == null)
                throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
            com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName);
            return extension.export(arg0);
        }
    }
    

    Protocol$Adaptive的主要功能

    1. 從url或擴展接口獲取擴展接口實現類的名稱;

    2.根據名稱,獲取實現類ExtensionLoader.getExtensionLoader(擴展接口類).getExtension(擴展接口實現類名稱),然後調用實現類的方法。

    需要明白一點dubbo的內部傳參基本上都是基於Url來實現的,也就是說Dubbo是基於URL驅動的技術

    所以,適配器類的目的是在運行期獲取擴展的真正實現來調用,解耦接口和實現,這樣的話要不我們自己實現適配器類,要不dubbo幫我們生成,而這些都是通過Adpative來實現。

    到目前為止,我們的AdaptiveExtension的主線走完了,可以簡單整理一下他們的調用關系如下

技術分享圖片

我們再回過去梳理下代碼,實際上在調用createAdaptiveExtensionClass之前,還做了一個操作。是執行getExtensionClasses方法,我們來看看這個方法做了什麽事情

getExtensionClasses

getExtensionClasses這個方法,就是加載擴展點實現類了。這段代碼本來應該先看的,但是擔心先看這段代碼會容易導致大家不好理解。我就把順序交換了下

這段代碼主要做如下幾個事情

  1. cachedClasses中獲得一個結果,這個結果實際上就是所有的擴展點類,key對應name,value對應class
  2. 通過雙重檢查鎖進行判斷

調用loadExtensionClasses,去加載左右擴展點的實現

//加載擴展點的實現類
private Map<String, Class<?>> getExtensionClasses() {
       
       Map<String, Class<?>> classes = cachedClasses.get();
       if (classes == null) {
           synchronized (cachedClasses) {
               classes = cachedClasses.get();
               if (classes == null) {
                   classes = loadExtensionClasses();
                   cachedClasses.set(classes);
               }
           }
       }
       return classes;
}

  

loadExtensionClasses

從不同目錄去加載擴展點的實現,在最開始的時候講到過的。META-INF/dubbo ;META-INF/internal ; META-INF/services

主要邏輯

  1. 獲得當前擴展點的註解,也就是Protocol.class這個類的註解,@SPI
  2. 判斷這個註解不為空,則再次獲得@SPI中的value值
  3. 如果value有值,也就是@SPI(“dubbo”),則講這個dubbo的值賦給cachedDefaultName。這就是為什麽我們能夠通過

ExtensionLoader.getExtensionLoader(Protocol.class).getDefaultExtension() ,能夠獲得DubboProtocol這個擴展點的原因

  1. 最後,通過loadFile去加載指定路徑下的所有擴展點。也就是META-INF/dubbo;META-INF/internal;META-INF/services

// 此方法已經getExtensionClasses方法同步過。
private Map<String, Class<?>> loadExtensionClasses() {
    //type->Protocol.class
    //得到SPI的註解
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if(defaultAnnotation != null) { //如果不等於空.
        String value = defaultAnnotation.value();
        if(value != null && (value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if(names.length > 1) {
                throw new IllegalStateException("more than 1 default extension name on extension " + type.getName()
                        + ": " + Arrays.toString(names));
            }
            if(names.length == 1) cachedDefaultName = names[0];
        }
    }
    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadFile(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    loadFile(extensionClasses, DUBBO_DIRECTORY);
    loadFile(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

loadFile

解析指定路徑下的文件,獲取對應的擴展點,通過反射的方式進行實例化以後,put到extensionClasses這個Map集合中

private void loadFile(Map<String, Class<?>> extensionClasses, String dir) {
    String fileName = dir + type.getName();
    try {
        Enumeration<java.net.URL> urls;
        ClassLoader classLoader = findClassLoader();
        if (classLoader != null) {
            urls = classLoader.getResources(fileName);
        } else {
            urls = ClassLoader.getSystemResources(fileName);
        }
        if (urls != null) {
            while (urls.hasMoreElements()) {
                java.net.URL url = urls.nextElement();
                try {
                    BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream(), "utf-8"));
                    try {
                        String line = null;
                        while ((line = reader.readLine()) != null) {
                            final int ci = line.indexOf(‘#‘);
                            if (ci >= 0) line = line.substring(0, ci);
                            line = line.trim();
                            if (line.length() > 0) {
                                try {
                                    String name = null;
                                    int i = line.indexOf(‘=‘);
                                    if (i > 0) {//文件采用name=value方式,通過i進行分割
                                        name = line.substring(0, i).trim();
                                        line = line.substring(i + 1).trim();
                                    }
                                    if (line.length() > 0) {
                                        Class<?> clazz = Class.forName(line, true, classLoader);
                                        //加載對應的實現類,並且判斷實現類必須是當前的加載的擴展點的實現
                                        if (! type.isAssignableFrom(clazz)) {
                                            throw new IllegalStateException("Error when load extension class(interface: " +
                                                    type + ", class line: " + clazz.getName() + "), class " 
                                                    + clazz.getName() + "is not subtype of interface.");
                                        }

                                        //判斷是否有自定義適配類,如果有,則在前面講過的獲取適配類的時候,直接返回當前的自定義適配類,不需要再動態創建
// 還記得在前面講過的getAdaptiveExtensionClass中有一個判斷嗎?是用來判斷cachedAdaptiveClass是不是為空的。如果不為空,表示存在自定義擴展點。也就不會去動態生成字節碼了。這個地方可以得到一個簡單的結論;
// @Adaptive如果是加在類上, 表示當前類是一個自定義的自適應擴展點
//如果是加在方法級別上,表示需要動態創建一個自適應擴展點,也就是Protocol$Adaptive
                                        if (clazz.isAnnotationPresent(Adaptive.class)) {
                                            if(cachedAdaptiveClass == null) {
                                                cachedAdaptiveClass = clazz;
                                            } else if (! cachedAdaptiveClass.equals(clazz)) {
                                                throw new IllegalStateException("More than 1 adaptive class found: "
                                                        + cachedAdaptiveClass.getClass().getName()
                                                        + ", " + clazz.getClass().getName());
                                            }
                                        } else {
                                            try {
                                                //如果沒有Adaptive註解,則判斷當前類是否帶有參數是type類型的構造函數,如果有,則認為是
                                                //wrapper類。這個wrapper實際上就是對擴展類進行裝飾.
                                                //可以在dubbo-rpc-api/internal下找到Protocol文件,發現Protocol配置了3個裝飾
                                                //分別是,filter/listener/mock. 所以Protocol這個實例來說,會增加對應的裝飾器
                                                clazz.getConstructor(type);//
                                                //得到帶有public DubboProtocol(Protocol protocol)的擴展點。進行包裝
                                                Set<Class<?>> wrappers = cachedWrapperClasses;
                                                if (wrappers == null) {
                                                    cachedWrapperClasses = new ConcurrentHashSet<Class<?>>();
                                                    wrappers = cachedWrapperClasses;
                                                }
                                                wrappers.add(clazz);//包裝類 ProtocolFilterWrapper(ProtocolListenerWrapper(Protocol))
                                            } catch (NoSuchMethodException e) {
                                                clazz.getConstructor();
                                                if (name == null || name.length() == 0) {
                                                    name = findAnnotationName(clazz);
                                                    if (name == null || name.length() == 0) {
                                                        if (clazz.getSimpleName().length() > type.getSimpleName().length()
                                                                && clazz.getSimpleName().endsWith(type.getSimpleName())) {
                                                            name = clazz.getSimpleName().substring(0, clazz.getSimpleName().length() - type.getSimpleName().length()).toLowerCase();
                                                        } else {
                                                            throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + url);
                                                        }
                                                    }
                                                }
                                                String[] names = NAME_SEPARATOR.split(name);
                                                if (names != null && names.length > 0) {
                                                    Activate activate = clazz.getAnnotation(Activate.class);
                                                    if (activate != null) {
                                                        cachedActivates.put(names[0], activate);
                                                    }
                                                    for (String n : names) {
                                                        if (! cachedNames.containsKey(clazz)) {
                                                            cachedNames.put(clazz, n);
                                                        }
                                                        Class<?> c = extensionClasses.get(n);
                                                        if (c == null) {
                                                            extensionClasses.put(n, clazz);
                                                        } else if (c != clazz) {
                                                            throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + n + " on " + c.getName() + " and " + clazz.getName());
                                                        }
                                                    }
                                                }
                                            }
                                        }
                                    }
                                } catch (Throwable t) {
                                    IllegalStateException e = new IllegalStateException("Failed to load extension class(interface: " + type + ", class line: " + line + ") in " + url + ", cause: " + t.getMessage(), t);
                                    exceptions.put(line, e);
                                }
                            }
                        } // end of while read lines
                    } finally {
                        reader.close();
                    }
                } catch (Throwable t) {
                    logger.error("Exception when load extension class(interface: " +
                                        type + ", class file: " + url + ") in " + url, t);
                }
            } // end of while urls
        }
    } catch (Throwable t) {
        logger.error("Exception when load extension class(interface: " +
                type + ", description file: " + fileName + ").", t);
    }
}

  

階段性小結

截止到目前,我們已經把基於Protocol的自適應擴展點看完了。也明白最終這句話應該返回的對象是什麽了.

Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class). getAdaptiveExtension();

也就是,這段代碼中,最終的protocol應該等於= Protocol$Adaptive

技術分享圖片

injectExtension

還記得這段代碼嗎?上面分析的代碼,只是知道,最終getAdaptiveExtensionClass.new Instance獲得一個自適應擴展點。而還有一段代碼

injectExtension沒有講。簡單來說,這個方法的作用,是為這個自適應擴展點進行依賴註入。類似於spring裏面的依賴註入功能。

為適配器類的setter方法插入其他擴展點或實現。(這塊代碼課堂上還沒講,大家抽空先看看)

private T createAdaptiveExtension() {
    try {
        //可以實現擴展點的註入
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("Can not create adaptive extenstion " + type + ", cause: " + e.getMessage(), e);
    }
}

  

最後留一個問題給大家?

getExtensionLoader這個方法中,會調用ExtensionLoader的私有構造方法進行初始化,其中有一個objectFactory. 這個是幹嘛的呢?先想想~

Dubbo(2)--Dubbo常用配置文件解析及核心源碼閱讀