曹工說mini-dubbo(1)--為了實踐動態代理,我寫了個簡單的rpc框架
阿新 • • 發佈:2020-03-16
#相關背景及資源:
之前本來一直在寫spring原始碼解析這塊,如下,aop部分剛好寫完。以前零散看過一些文章,知道rpc呼叫基本就是使用動態代理,比如rmi,dubbo,feign呼叫等。自己也就想著試一下,於是有了mini-dubbo這個東西,暫時也不能稱為一個框架,因為還不是生產級的,目前只是實現了一部分小功能,也沒有監控,也沒有xxx,反正就是缺的比較多。
[曹工說Spring Boot原始碼(22)-- 你說我Spring Aop依賴AspectJ,我依賴它什麼了](https://www.cnblogs.com/grey-wolf/p/12418425.html)
我就說下,裡面用到的知識點吧,有興趣的,可以克隆原始碼下來看看:
1. 動態代理
2. 服務註冊和消費,使用redis作為註冊中心,其中使用了redisson作為redis客戶端,其中涉及到BeanFactoryPostProcessor的使用
3. 因為傳輸層使用netty和mina,是非同步的,但是上層又需要等待結果,所以用到了同步轉非同步
4. spring的xml解析,bean definition註冊,spring 擴充套件xml 名稱空間
5. 自定義的spi的相關知識
6. 分層思想,從dubbo借鑑了其分層,但是mini-dubbo要少幾層,因為我暫時不是很清楚dubbo的每一層的具體職責,所以我按我自己理解分的層。上層依賴下層,只通過下層的介面,查詢下層介面時,直接在spring容器中查詢bean即可,類似於spring mvc的設計。當下層有多個實現時,通過類似spi機制來指定具體要使用的下層實現。
7. 基於第5點,所以本框架非常容易替換各層的實現,只要自己自定義一個spring bean,實現對應的介面,然後在spi檔案中指定本實現的類名即可。
8. netty和mina的tcp粘包拆包工作。
# 概要
程式碼我放在瞭如下位置:
我介紹下程式碼的整體結構:
![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316090226583-955250144.png)
服務端聚合工程比較簡單,目前也沒時間去仔細弄,包含了如下module:
```xml
../mini-dubbo-api
../mini-dubbo-server
../mini-dubbo-core
../mini-dubbo-common
```
目前的大部分實現,是在客戶端,包含了如下module:
```xml
../mini-dubbo-api
../mini-dubbo-client
../mini-dubbo-core
../mini-dubbo-common
../mini-dubbo-registry-layer
../mini-dubbo-cluster-layer
../mini-dubbo-exchange-layer
../mini-dubbo-transport-layer-netty
../mini-dubbo-transport-layer-mina
```
其中,模組間的依賴關係如下:
業務模組,一般只需要依賴mini-dubbo-core模組,mini-dubbo-core主要依賴瞭如下模組:
![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316090846887-1110246996.png)
為什麼這麼劃分,因為mini-dubbo-core模組,其實主要是完成解析業務模組(比如client)中的xml,根據其xml配置,註冊對應的bean到spring 容器中,而具體的bean實現,就是放在各個模組的,比如,xml裡配置netty作為傳輸層實現,那麼mini-dubbo-core就得解析為mini-dubbo-transport-layer-netty中的一個實現類作為bean,註冊到spring容器,供上層使用。
目前的分層,只是暫時的,後續可能會略有調整。
#一次客戶端呼叫的大體思路
1. 業務module中,配置xml,示例如下:
```xml
```
其中的dubbo:reference就代表了一個遠端的服務,業務程式碼中可以自動注入該介面,當呼叫該介面時,實際就會發起rpc呼叫。
熟悉的同學已經知道了,這塊肯定是生成了一個動態代理。
2. 繼續之前,我們看看dubbo的十層架構:
![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316091921530-1759141384.png)
可以看到,我們這邊是比dubbo少了幾層,首先proxy,目前直接用了jdk動態代理,沒有其他技術,所以就沒有抽出一層;然後monitor層,現在肯定是沒有的,這部分其實才是一個框架的重頭戲,但是我也不會前端,所以這塊估計暫時沒有;接下來是protocol層,我暫時不太清楚dubbo的設計,所以就沒弄這層。
3. 知道了分層結構後,我們可以回到第一點,即動態代理那裡,我們的動態代理,只依賴下層的介面。目前,各層之間的介面,放在mini-dubbo-common模組中,定義如下:
* 註冊中心層,負責接收上層傳來的呼叫引數等上下文,並返回結果
```java
/**
* 註冊中心層的rpc呼叫者
* 1:接收上層傳下來的業務引數,並返回結果
*
* 本層:會根據不同實現,去相應的註冊中心,獲取匹配的服務提供者列表,傳輸給下一層
*/
public interface RegistryLayerRpcInvoker {
Object invoke(RpcContext rpcContext);
}
```
* 叢集層,接收上層註冊中心層傳來的服務提供者列表和rpc呼叫上下文,並返回最終結果
```java
public interface ClusterLayerRpcInvoker {
/**
* 由註冊中心層提供對應service的服務提供者列表,本方法可以根據負載均衡策略,進行篩選
* @param providerList
* @param rpcContext
* @return
*/
Object invoke(List providerList, RpcContext rpcContext);
}
```
* exchange層,上層叢集層,會替我們選好某一臺具體的服務提供者,然後讓我們去呼叫,本層完成同步轉非同步
```java
public interface ExchangeLayerRpcInvoker {
/**
*
* @param providerHostAndPort 要呼叫的服務提供者的地址
* @param rpcContext rpc上下文,包含了要呼叫的引數等
* @return rpc呼叫的結果
*/
Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);
}
```
* 傳輸層,本層目前有兩個簡單實現,netty和mina。
```java
/**
*
* 本層為傳輸層,上層為exchange層。
* 上層exchange,目前有一個預設實現,主要是完成同步轉非同步的操作。
* 上層將具體的傳輸工作交給底層的傳輸層,比如netty和mina,然後在一個future上等待傳輸層完成工作
*
* 本層會完成實際的傳送工作和接收返回響應的工作
*/
public interface TransportLayerRpcInvoker {
/**
*
* @param providerHostAndPort 要呼叫的服務提供者的地址
* @param rpcContext rpc上下文,包含了要呼叫的引數等
* @return rpc呼叫的結果
*/
Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext);
}
```
其中,我們的最上邊的動態代理層,只依賴於下層,其中,示例程式碼如下:
```java
@Override
public Object invoke(Object proxy, Method method, Object[] args) {
// 1.從spring容器中,獲取下層的實現bean;如果有多個,則根據spi檔案中指定的為準
RegistryLayerRpcInvoker registryLayerRpcInvoker =
SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class);
RpcContext rpcContext = new RpcContext();
rpcContext.setProxy(proxy);
rpcContext.setMethod(method);
rpcContext.setArgs(args);
rpcContext.setServiceName(method.getDeclaringClass().getName());
// 2.呼叫下層
Object o = registryLayerRpcInvoker.invoke(rpcContext);
return o;
}
```
這裡1處,可以看到,我們通過SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去獲取具體的下層實現,這是我們自定義的一個工具類,其內部實現一會再說。
2處呼叫下層實現,獲取結果。
4. registry,註冊中心層的實現
```java
@Service
public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker {
@Autowired
private RedisRegistry redisRegistry;
@Override
public Object invoke(RpcContext rpcContext) {
//1.獲取叢集層實現
ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class);
//2.從redis中,根據服務名,獲取服務提供者列表
List list = redisRegistry.getServiceProviderList(rpcContext.getServiceName());
if (CollectionUtils.isEmpty(list)) {
throw new RuntimeException();
}
//2.呼叫叢集層實現,獲取結果
Object o = clusterLayerRpcInvoker.invoke(list, rpcContext);
return o;
}
}
```
5. 叢集層實現,本層我也不算懂,模仿dubbo實現了一下。
主要實現了以下兩種:
* Failover,出現失敗,立即重試其他伺服器。可以設定重試次數。
* Failfast,請求失敗以後,返回異常結果,不進行重試。
以failover為例:
```java
@Slf4j
@Service
public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker {
@Autowired
private LoadBalancePolicy loadBalancePolicy;
@Override
public Object invoke(List providerList, RpcContext rpcContext) {
ExchangeLayerRpcInvoker exchangeLayerRpcInvoker =
SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class);
int retryTimes = 3;
for (int i = 0; i < retryTimes; i++) {
// 1.根據負載均衡策略,選擇1臺服務提供者
ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList);
try {
// 呼叫下層,獲取結果
Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext);
return o;
} catch (Exception e) {
log.error("fail to invoke {},exception:{},will try another",
providerHostAndPort,e);
// 2.如果呼叫失敗,進入下一次迴圈
continue;
}
}
throw new RuntimeException("fail times extend");
}
}
```
其中,一共會嘗試3次,每次的邏輯:根據負載均衡策略,選擇1臺去呼叫;如果有問題,則換一臺。
呼叫下層時,獲取了下層的介面:ExchangeLayerRpcInvoker
6. exchange層,這層完成同步轉非同步的操作,目前只有一個實現:
```java
@Service
public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker {
public static ConcurrentHashMap> requestId2futureMap =
new ConcurrentHashMap<>();
@Override
public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) {
String requestId = UUID.randomUUID().toString();
rpcContext.setRequestId(requestId);
rpcContext.setRequestId2futureMap(requestId2futureMap);
CompletableFuture