1. 程式人生 > >曹工說mini-dubbo(1)--為了實踐動態代理,我寫了個簡單的rpc框架

曹工說mini-dubbo(1)--為了實踐動態代理,我寫了個簡單的rpc框架

#相關背景及資源: 之前本來一直在寫spring原始碼解析這塊,如下,aop部分剛好寫完。以前零散看過一些文章,知道rpc呼叫基本就是使用動態代理,比如rmi,dubbo,feign呼叫等。自己也就想著試一下,於是有了mini-dubbo這個東西,暫時也不能稱為一個框架,因為還不是生產級的,目前只是實現了一部分小功能,也沒有監控,也沒有xxx,反正就是缺的比較多。 [曹工說Spring Boot原始碼(22)-- 你說我Spring Aop依賴AspectJ,我依賴它什麼了](https://www.cnblogs.com/grey-wolf/p/12418425.html) 我就說下,裡面用到的知識點吧,有興趣的,可以克隆原始碼下來看看: 1. 動態代理 2. 服務註冊和消費,使用redis作為註冊中心,其中使用了redisson作為redis客戶端,其中涉及到BeanFactoryPostProcessor的使用 3. 因為傳輸層使用netty和mina,是非同步的,但是上層又需要等待結果,所以用到了同步轉非同步 4. spring的xml解析,bean definition註冊,spring 擴充套件xml 名稱空間 5. 自定義的spi的相關知識 6. 分層思想,從dubbo借鑑了其分層,但是mini-dubbo要少幾層,因為我暫時不是很清楚dubbo的每一層的具體職責,所以我按我自己理解分的層。上層依賴下層,只通過下層的介面,查詢下層介面時,直接在spring容器中查詢bean即可,類似於spring mvc的設計。當下層有多個實現時,通過類似spi機制來指定具體要使用的下層實現。 7. 基於第5點,所以本框架非常容易替換各層的實現,只要自己自定義一個spring bean,實現對應的介面,然後在spi檔案中指定本實現的類名即可。 8. netty和mina的tcp粘包拆包工作。 # 概要 程式碼我放在瞭如下位置: 我介紹下程式碼的整體結構: ![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316090226583-955250144.png) 服務端聚合工程比較簡單,目前也沒時間去仔細弄,包含了如下module: ```xml ../mini-dubbo-api ../mini-dubbo-server ../mini-dubbo-core ../mini-dubbo-common ``` 目前的大部分實現,是在客戶端,包含了如下module: ```xml ../mini-dubbo-api
../mini-dubbo-client ../mini-dubbo-core ../mini-dubbo-common ../mini-dubbo-registry-layer ../mini-dubbo-cluster-layer ../mini-dubbo-exchange-layer ../mini-dubbo-transport-layer-netty ../mini-dubbo-transport-layer-mina
``` 其中,模組間的依賴關係如下: 業務模組,一般只需要依賴mini-dubbo-core模組,mini-dubbo-core主要依賴瞭如下模組: ![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316090846887-1110246996.png) 為什麼這麼劃分,因為mini-dubbo-core模組,其實主要是完成解析業務模組(比如client)中的xml,根據其xml配置,註冊對應的bean到spring 容器中,而具體的bean實現,就是放在各個模組的,比如,xml裡配置netty作為傳輸層實現,那麼mini-dubbo-core就得解析為mini-dubbo-transport-layer-netty中的一個實現類作為bean,註冊到spring容器,供上層使用。 目前的分層,只是暫時的,後續可能會略有調整。 #一次客戶端呼叫的大體思路 1. 業務module中,配置xml,示例如下: ```xml
``` 其中的dubbo:reference就代表了一個遠端的服務,業務程式碼中可以自動注入該介面,當呼叫該介面時,實際就會發起rpc呼叫。 熟悉的同學已經知道了,這塊肯定是生成了一個動態代理。 2. 繼續之前,我們看看dubbo的十層架構: ![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316091921530-1759141384.png) 可以看到,我們這邊是比dubbo少了幾層,首先proxy,目前直接用了jdk動態代理,沒有其他技術,所以就沒有抽出一層;然後monitor層,現在肯定是沒有的,這部分其實才是一個框架的重頭戲,但是我也不會前端,所以這塊估計暫時沒有;接下來是protocol層,我暫時不太清楚dubbo的設計,所以就沒弄這層。 3. 知道了分層結構後,我們可以回到第一點,即動態代理那裡,我們的動態代理,只依賴下層的介面。目前,各層之間的介面,放在mini-dubbo-common模組中,定義如下: * 註冊中心層,負責接收上層傳來的呼叫引數等上下文,並返回結果 ```java /** * 註冊中心層的rpc呼叫者 * 1:接收上層傳下來的業務引數,並返回結果 * * 本層:會根據不同實現,去相應的註冊中心,獲取匹配的服務提供者列表,傳輸給下一層 */ public interface RegistryLayerRpcInvoker { Object invoke(RpcContext rpcContext); } ``` * 叢集層,接收上層註冊中心層傳來的服務提供者列表和rpc呼叫上下文,並返回最終結果 ```java public interface ClusterLayerRpcInvoker { /** * 由註冊中心層提供對應service的服務提供者列表,本方法可以根據負載均衡策略,進行篩選 * @param providerList * @param rpcContext * @return */ Object invoke(List providerList, RpcContext rpcContext); } ``` * exchange層,上層叢集層,會替我們選好某一臺具體的服務提供者,然後讓我們去呼叫,本層完成同步轉非同步 ```java public interface ExchangeLayerRpcInvoker { /** * * @param providerHostAndPort 要呼叫的服務提供者的地址 * @param rpcContext rpc上下文,包含了要呼叫的引數等 * @return rpc呼叫的結果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); } ``` * 傳輸層,本層目前有兩個簡單實現,netty和mina。 ```java /** * * 本層為傳輸層,上層為exchange層。 * 上層exchange,目前有一個預設實現,主要是完成同步轉非同步的操作。 * 上層將具體的傳輸工作交給底層的傳輸層,比如netty和mina,然後在一個future上等待傳輸層完成工作 * * 本層會完成實際的傳送工作和接收返回響應的工作 */ public interface TransportLayerRpcInvoker { /** * * @param providerHostAndPort 要呼叫的服務提供者的地址 * @param rpcContext rpc上下文,包含了要呼叫的引數等 * @return rpc呼叫的結果 */ Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext); } ``` 其中,我們的最上邊的動態代理層,只依賴於下層,其中,示例程式碼如下: ```java @Override public Object invoke(Object proxy, Method method, Object[] args) { // 1.從spring容器中,獲取下層的實現bean;如果有多個,則根據spi檔案中指定的為準 RegistryLayerRpcInvoker registryLayerRpcInvoker = SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class); RpcContext rpcContext = new RpcContext(); rpcContext.setProxy(proxy); rpcContext.setMethod(method); rpcContext.setArgs(args); rpcContext.setServiceName(method.getDeclaringClass().getName()); // 2.呼叫下層 Object o = registryLayerRpcInvoker.invoke(rpcContext); return o; } ``` 這裡1處,可以看到,我們通過SpiServiceLoader.loadService(RegistryLayerRpcInvoker.class)去獲取具體的下層實現,這是我們自定義的一個工具類,其內部實現一會再說。 2處呼叫下層實現,獲取結果。 4. registry,註冊中心層的實現 ```java @Service public class RedisRegistryRpcInvoker implements RegistryLayerRpcInvoker { @Autowired private RedisRegistry redisRegistry; @Override public Object invoke(RpcContext rpcContext) { //1.獲取叢集層實現 ClusterLayerRpcInvoker clusterLayerRpcInvoker = SpiServiceLoader.loadService(ClusterLayerRpcInvoker.class); //2.從redis中,根據服務名,獲取服務提供者列表 List list = redisRegistry.getServiceProviderList(rpcContext.getServiceName()); if (CollectionUtils.isEmpty(list)) { throw new RuntimeException(); } //2.呼叫叢集層實現,獲取結果 Object o = clusterLayerRpcInvoker.invoke(list, rpcContext); return o; } } ``` 5. 叢集層實現,本層我也不算懂,模仿dubbo實現了一下。 主要實現了以下兩種: * Failover,出現失敗,立即重試其他伺服器。可以設定重試次數。 * Failfast,請求失敗以後,返回異常結果,不進行重試。 以failover為例: ```java @Slf4j @Service public class FailoverClusterLayerRpcInvoker implements ClusterLayerRpcInvoker { @Autowired private LoadBalancePolicy loadBalancePolicy; @Override public Object invoke(List providerList, RpcContext rpcContext) { ExchangeLayerRpcInvoker exchangeLayerRpcInvoker = SpiServiceLoader.loadService(ExchangeLayerRpcInvoker.class); int retryTimes = 3; for (int i = 0; i < retryTimes; i++) { // 1.根據負載均衡策略,選擇1臺服務提供者 ProviderHostAndPort providerHostAndPort = loadBalancePolicy.selectOne(providerList); try { // 呼叫下層,獲取結果 Object o = exchangeLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); return o; } catch (Exception e) { log.error("fail to invoke {},exception:{},will try another", providerHostAndPort,e); // 2.如果呼叫失敗,進入下一次迴圈 continue; } } throw new RuntimeException("fail times extend"); } } ``` 其中,一共會嘗試3次,每次的邏輯:根據負載均衡策略,選擇1臺去呼叫;如果有問題,則換一臺。 呼叫下層時,獲取了下層的介面:ExchangeLayerRpcInvoker 6. exchange層,這層完成同步轉非同步的操作,目前只有一個實現: ```java @Service public class Sync2AsyncExchangeImpl implements ExchangeLayerRpcInvoker { public static ConcurrentHashMap> requestId2futureMap = new ConcurrentHashMap<>(); @Override public Object invoke(ProviderHostAndPort providerHostAndPort, RpcContext rpcContext) { String requestId = UUID.randomUUID().toString(); rpcContext.setRequestId(requestId); rpcContext.setRequestId2futureMap(requestId2futureMap); CompletableFuture completableFuture = new CompletableFuture<>(); requestId2futureMap.put(requestId, completableFuture); /** * 交給具體的底層去解決 */ TransportLayerRpcInvoker transportLayerRpcInvoker = SpiServiceLoader.loadService(TransportLayerRpcInvoker .class); transportLayerRpcInvoker.invoke(providerHostAndPort, rpcContext); Object s = null; try { s = completableFuture.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return s; } } ``` 這層大家可以簡單理解為:主執行緒呼叫傳輸層之前,生成一個id和一個completablefuture,放到一個全域性map,然後將id傳給下層,然後在completablefuture上阻塞;下層拿到id後,在訊息裡傳輸;服務端再將id傳輸回來,然後客戶端拿著id找到completablefuture,並喚醒主執行緒。 7. 資訊傳輸層,以netty為例,具體的netty相關的知識,大家就得自己先學習一下: 簡單步驟如下: ```java //1.初始化客戶端連線 public void initChannel() { Bootstrap b = configBootStrap(); ChannelFuture future = null; try { future = b.connect(providerHostAndPort.getHost(), providerHostAndPort.getPort()).sync(); if (future.isSuccess()) { channel = future.channel(); return; } } catch (InterruptedException e) { ... } throw new RuntimeException(); } private Bootstrap configBootStrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer() { @Override public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline p = ch.pipeline(); p.addLast("lengthFieldPrepender", new LengthFieldPrepender(2)); p.addLast("lengthFieldBasedFrameDecoder", new LengthFieldBasedFrameDecoder( 65536, 0, 2, 0, 2)); p.addLast("decoder", new StringDecoder()); p.addLast("encoder", new StringEncoder()); p.addLast(new ClientHandler()); }//攔截器設定 }); return b; } ``` 使用連線的channle,傳送資料: ```java public void sendMessage(String messageContent) { synchronized (lockObj) { if (channel == null) { initChannel(); } } ChannelFuture channelFuture = channel.writeAndFlush(messageContent); channelFuture.addListener(new GenericFutureListener>() { @Override public void operationComplete(Future future) throws Exception { System.out.println("傳送請求訊息成功"); } }); } ``` 8. netty接收到服務端相應後,根據requestId來獲取future,喚醒上層執行緒 ```java @Slf4j public class ClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext cx) { log.info("channelActive,local address:{},remote address:{}", cx.channel().localAddress(),cx.channel().remoteAddress()); } /** * 讀取資訊 * * @param ctx 渠道連線物件 * @param msg 資訊 * @throws Exception */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ResponseVO responseVO = JSONObject.parseObject((String) msg, ResponseVO.class); String requestId = responseVO.getRequestId(); //1.獲取future CompletableFuture completableFuture = Netty4ClientRpcInvoker.requestId2futureMap .get(requestId); //2.將結果塞進future,在此future上阻塞的執行緒被喚醒 completableFuture.complete(responseVO.getContent()); log.info("client channelRead,thread:{}", Thread.currentThread()); log.info("客戶端端讀寫遠端地址是-----------" + ctx.channel().remoteAddress() + "資訊是:" + msg.toString()); } } ``` # 如何根據spi進行切換 之前我們提到了可以根據spi,隨意切換實現,比如我們想使用mina來傳輸的話: ![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200316095845967-668089616.png) 這裡的spi的原理也很簡單: ```java dubbo.learn.common.spi.SpiServiceLoader#loadService public static T loadService(Class clazz) { //先查詢快取 Object cached = spiName2ServiceMap.get(clazz.getName()); if (cached != null) { return (T) cached; } //2.從spring容器獲取該class的全部實現bean Map map = applicationContext.getBeansOfType(clazz); if (CollectionUtils.isEmpty(map)) { return null; } if (map.size() == 1) { Object o = map.values().iterator().next(); return clazz.cast(o); } //讀取spi檔案,獲取使用者指定的實現 String s = SpiParser.getSpiForSpecifiedService(clazz); if (StringUtils.isEmpty(s)) { log.error("發現多個服務實現bean:{},且在spi中未指定要使用的bean",map); throw new RuntimeException(); } // 根據使用者spi中的實現,來返回相應的bean Object specifiedServiceInSpiFile = map.values().stream().filter(v -> Objects.equals(v.getClass().getName(), s)) .findFirst().orElse(null); if (specifiedServiceInSpiFile == null) { log.error("spi中指定的服務在bean集合中未找到。" + "發現多個服務實現bean:{},在spi中指定的服務為:{}",map,s); throw new RuntimeException(); } spiName2ServiceMap.put(clazz.getName(),specifiedServiceInSpiFile); return (T) specifiedServiceInSpiFile; } ``` # 總結 裡面細節比較多,最近工作比較忙,所以,大家可以先把程式碼弄下來,直接自己執行下,依賴的就只有一個redis而已。 後續我會接著優化該框架,歡迎大家加進來,一起開發;如果覺得還不錯,就star一下吧。 原始碼路徑: