1. 程式人生 > >徒手擼框架--實現 RPC 遠端呼叫

徒手擼框架--實現 RPC 遠端呼叫

微服務,已經是每個網際網路開發者必須掌握的一項技術。而 RPC 框架,是構成微服務最重要的組成部分之一。趁最近有時間。又看了看 dubbo 的原始碼。dubbo 為了做到靈活和解耦,使用了大量的設計模式和 SPI機制,要看懂 dubbo 的程式碼也不太容易。

按照《徒手擼框架》系列文章的套路,我還是會極簡的實現一個 RPC 框架。幫助大家理解 RPC 框架的原理。

廣義的來講一個完整的 RPC 包含了很多元件,包括服務發現,服務治理,遠端呼叫,呼叫鏈分析,閘道器等等。我將會慢慢的實現這些功能,這篇文章主要先講解的是 RPC 的基石,遠端呼叫 的實現。

相信,讀完這篇文章你也一定可以自己實現一個可以提供 RPC 呼叫的框架。

  1. RPC 的呼叫過程 通過一圖我們來了解一下 RPC 的呼叫過程,從巨集觀上來看看到底一次 RPC 呼叫經過些什麼過程。

當一次呼叫開始:

圖片描述(最多50字)

client 會呼叫本地動態代理 proxy 這個代理會將呼叫通過協議轉序列化位元組流 通過 netty 網路框架,將位元組流傳送到服務端 服務端在受到這個位元組流後,會根據協議,反序列化為原始的呼叫,利用反射原理呼叫服務方提供的方法 如果請求有返回值,又需要把結果根據協議序列化後,再通過 netty 返回給呼叫方 2. 框架概覽和技術選型 看一看框架的元件:

圖片描述(最多50字)

clinet就是呼叫方。servive是服務的提供者。protocol包定義了通訊協議。common包含了通用的一些邏輯元件。

技術選型專案使用 maven 作為包管理工具,json 作為序列化協議,使用spring boot管理物件的生命週期,netty作為 nio 的網路元件。所以要閱讀這篇文章,你需要對spring boot和netty有基本的瞭解。

下面就看看每個元件的具體實現:

  1. protocol 其實作為 RPC 的協議,需要考慮只有一個問題–就是怎麼把一次方法的呼叫,變成能夠被網路傳輸的位元組流。

首先我們需要定義方法的呼叫和返回兩個實體:

請求:

@Data public class RpcRequest { // 呼叫編號 private String requestId; // 類名 private String className; // 方法名 private String methodName; // 請求引數的資料型別 private Class<?>[] parameterTypes; // 請求的引數 private Object[] parameters; } 結果:

@Data public class RpcResponse { // 呼叫編號 private String requestId; // 丟擲的異常 private Throwable throwable; // 返回結果 private Object result; } 確定了,需要序列化的物件,就要確定序列化的協議,實現兩個方法,序列化和反序列化兩個方法。

public interface Serialization { byte[] serialize(T obj); T deSerialize(byte[] data,Class clz); } 可選用的序列化的協議很多比如:

jdk 的序列化方法。(不推薦,不利於之後的跨語言呼叫) json 可讀性強,但是序列化速度慢,體積大。 protobuf,kyro,Hessian 等都是優秀的序列化框架,也可按需選擇。 為了簡單和便於除錯,我們就選擇 json 作為序列化協議,使用jackson作為 json 解析框架。

/**

  • @author Zhengxin */ public class JsonSerialization implements Serialization { private ObjectMapper objectMapper; public JsonSerialization(){ this.objectMapper = new ObjectMapper(); } @Override public byte[] serialize(T obj) { try { return objectMapper.writeValueAsBytes(obj); } catch (JsonProcessingException e) { e.printStackTrace(); } return null; } @Override public T deSerialize(byte[] data, Class clz) { try { return objectMapper.readValue(data,clz); } catch (IOException e) { e.printStackTrace(); } return null; } } 因為 netty 支援自定義 coder 。所以只需要實現 ByteToMessageDecoder 和 MessageToByteEncoder 兩個介面。就解決了序列化的問題:

public class RpcDecoder extends ByteToMessageDecoder { private Class<?> clz; private Serialization serialization; public RpcDecoder(Class<?> clz,Serialization serialization){ this.clz = clz; this.serialization = serialization; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { if(in.readableBytes() < 4){ return; } in.markReaderIndex(); int dataLength = in.readInt(); if (in.readableBytes() < dataLength) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = serialization.deSerialize(data, clz); out.add(obj); } } public class RpcEncoder extends MessageToByteEncoder { private Class<?> clz; private Serialization serialization; public RpcEncoder(Class<?> clz, Serialization serialization){ this.clz = clz; this.serialization = serialization; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if(clz != null){ byte[] bytes = serialization.serialize(msg); out.writeInt(bytes.length); out.writeBytes(bytes); } } } 至此,protocol 就實現了,我們就可以把方法的呼叫和結果的返回,轉換為一串可以在網路中傳輸的 byte[] 陣列了。

  1. server server 是負責處理客戶端請求的元件。在網際網路高併發的環境下,使用 Nio 非阻塞的方式可以相對輕鬆的應付高併發的場景。netty 是一個優秀的 Nio 處理框架。Server 的關鍵程式碼如下:

netty 是基於 Recotr 模型的。所以需要初始化兩組執行緒 boss 和 worker 。boss 負責分發請求,worker 負責執行相應的 handler:

@Bean public ServerBootstrap serverBootstrap() throws InterruptedException { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup(), workerGroup()) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(serverInitializer); Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions(); Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet(); for (@SuppressWarnings(“rawtypes”) ChannelOption option : keySet) { serverBootstrap.option(option, tcpChannelOptions.get(option)); } return serverBootstrap; } netty 的操作是基於 pipeline 的。所以我們需要把在 protocol 實現的幾個 coder 註冊到 netty 的 pipeline 中。

ChannelPipeline pipeline = ch.pipeline(); // 處理 tcp 請求中粘包的 coder,具體作用可以自行 google pipeline.addLast(new LengthFieldBasedFrameDecoder(65535,0,4)); // protocol 中實現的 序列化和反序列化 coder pipeline.addLast(new RpcEncoder(RpcResponse.class,new JsonSerialization())); pipeline.addLast(new RpcDecoder(RpcRequest.class,new JsonSerialization())); // 具體處理請求的 handler 下文具體解釋 pipeline.addLast(serverHandler); 實現具體的 ServerHandler 用於處理真正的呼叫。 ServerHandler 繼承 SimpleChannelInboundHandler。簡單來說這個 InboundHandler 會在資料被接受時或者對於的 Channel 的狀態發生變化的時候被呼叫。當這個 handler 讀取資料的時候方法 channelRead0() 會被用,所以我們就重寫這個方法就夠了。

@Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { RpcResponse rpcResponse = new RpcResponse(); rpcResponse.setRequestId(msg.getRequestId()); try{ // 收到請求後開始處理請求 Object handler = handler(msg); rpcResponse.setResult(handler); }catch (Throwable throwable){ // 如果丟擲異常也將異常存入 response 中 rpcResponse.setThrowable(throwable); throwable.printStackTrace(); } // 操作完以後寫入 netty 的上下文中。netty 自己處理返回值。 ctx.writeAndFlush(rpcResponse); } handler(msg) 實際上使用的是 cglib 的 Fastclass 實現的,其實根本原理,還是反射。學好 java 中的反射真的可以為所欲為。

private Object handler(RpcRequest request) throws Throwable { Class<?> clz = Class.forName(request.getClassName()); Object serviceBean = applicationContext.getBean(clz); Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // 根本思路還是獲取類名和方法名,利用反射實現呼叫 FastClass fastClass = FastClass.create(serviceClass); FastMethod fastMethod = fastClass.getMethod(methodName,parameterTypes); // 實際呼叫發生的地方 return fastMethod.invoke(serviceBean,parameters); } 總體上來看,server 的實現不是很困難。核心的知識點是 netty 的 channel 的使用和 cglib 的反射機制。

  1. client future

其實,對於我來說,client 的實現難度,遠遠大於 server 的實現。netty 是一個非同步框架,所有的返回都是基於 Future 和 Callback 的機制。

所以在閱讀以下文字前強烈推薦,我之前寫的一篇文章 Future 研究。利用經典的 wite 和 notify 機制,實現非同步的獲取請求的結果。

/**

  • @author zhengxin */ public class DefaultFuture { private RpcResponse rpcResponse; private volatile boolean isSucceed = false; private final Object object = new Object(); public RpcResponse getResponse(int timeout){ synchronized (object){ while (!isSucceed){ try { //wait object.wait(timeout); } catch (InterruptedException e) { e.printStackTrace(); } } return rpcResponse; } } public void setResponse(RpcResponse response){ if(isSucceed){ return; } synchronized (object) { this.rpcResponse = response; this.isSucceed = true; //notiy object.notify(); } } } 複用資源

為了能夠提升 client 的吞吐量,可提供的思路有以下幾種:

使用物件池:建立多個 client 以後儲存在物件池中。但是程式碼的複雜度和維護 client 的成本會很高。 儘可能的複用 netty 中的 channel。 之前你可能注意到,為什麼要在 RpcRequest 和 RpcResponse 中增加一個 ID。因為 netty 中的 channel 是會被多個執行緒使用的。當一個結果非同步的返回後,你並不知道是哪個執行緒返回的。這個時候就可以考慮利用一個 Map,建立一個 ID 和 Future 對映。這樣請求的執行緒只要使用對應的 ID 就能獲取,相應的返回結果。

/**

  • @author Zhengxin */ public class ClientHandler extends ChannelDuplexHandler { // 使用 map 維護 id 和 Future 的對映關係,在多執行緒環境下需要使用執行緒安全的容器 private final Map<String, DefaultFuture> futureMap = new ConcurrentHashMap<>(); @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { if(msg instanceof RpcRequest){ RpcRequest request = (RpcRequest) msg; // 寫資料的時候,增加對映 futureMap.putIfAbsent(request.getRequestId(),new DefaultFuture()); } super.write(ctx, msg, promise); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if(msg instanceof RpcResponse){ RpcResponse response = (RpcResponse) msg; // 獲取資料的時候 將結果放入 future 中 DefaultFuture defaultFuture = futureMap.get(response.getRequestId()); defaultFuture.setResponse(response); } super.channelRead(ctx, msg); } public RpcResponse getRpcResponse(String requestId){ try { // 從 future 中獲取真正的結果。 DefaultFuture defaultFuture = futureMap.get(requestId); return defaultFuture.getResponse(10); }finally { // 完成後從 map 中移除。 futureMap.remove(requestId); } } } 這裡沒有繼承 server 中的 InboundHandler 而使用了 ChannelDuplexHandler。顧名思義就是在寫入和讀取資料的時候,都會觸發相應的方法。寫入的時候在 Map 中儲存 ID 和 Future。讀到資料的時候從 Map 中取出 Future 並將結果放入 Future 中。獲取結果的時候需要對應的 ID。

使用 Transporters 對請求進行封裝。

public class Transporters { public static RpcResponse send(RpcRequest request){ NettyClient nettyClient = new NettyClient(“127.0.0.1”, 8080); nettyClient.connect(nettyClient.getInetSocketAddress()); RpcResponse send = nettyClient.send(request); return send; } } 動態代理的實現

動態代理技術最廣為人知的應用,應該就是 Spring Aop,面向切面的程式設計實現。動態的在原有方法Before 或者 After 新增程式碼。而 RPC 框架中動態代理的作用就是徹底替換原有方法,直接呼叫遠端方法。

代理工廠類:

public class ProxyFactory { @SuppressWarnings(“unchecked”) public static T create(Class interfaceClass){ return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new RpcInvoker(interfaceClass) ); } } 當 proxyFactory 生成的類被呼叫的時候,就會執行 RpcInvoker 方法。

public class RpcInvoker implements InvocationHandler { private Class clz; public RpcInvoker(Class clz){ this.clz = clz; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { RpcRequest request = new RpcRequest(); String requestId = UUID.randomUUID().toString(); String className = method.getDeclaringClass().getName(); String methodName = method.getName(); Class<?>[] parameterTypes = method.getParameterTypes(); request.setRequestId(requestId); request.setClassName(className); request.setMethodName(methodName); request.setParameterTypes(parameterTypes); request.setParameters(args); return Transporters.send(request).getResult(); } } 看到這個 invoke 方法,主要三個作用,

生成 RequestId。 拼裝 RpcRequest。 呼叫 Transports 傳送請求,獲取結果。 至此終於,整個呼叫鏈完整了。我們終於完成了一次 RPC 呼叫。

與 Spring 整合

為了使我們的 client 能夠易於使用我們需要考慮,定義一個自定義註解 @RpcInterface 當我們的專案接入 Spring 以後,Spring 掃描到這個註解之後,自動的通過我們的 ProxyFactory 建立代理物件,並存放在 spring 的 applicationContext 中。這樣我們就可以通過 @Autowired 註解直接注入使用了。

@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) public @interface RpcInterface { }

@Configuration @Slf4j public class RpcConfig implements ApplicationContextAware,InitializingBean { private ApplicationContext applicationContext; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; } @Override public void afterPropertiesSet() throws Exception { Reflections reflections = new Reflections(“com.xilidou”); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) applicationContext.getAutowireCapableBeanFactory(); // 獲取 @RpcInterfac 標註的介面 Set<Class<?>> typesAnnotatedWith = reflections.getTypesAnnotatedWith(RpcInterface.class); for (Class<?> aClass : typesAnnotatedWith) { // 建立代理物件,並註冊到 spring 上下文。 beanFactory.registerSingleton(aClass.getSimpleName(),ProxyFactory.create(aClass)); } log.info(“afterPropertiesSet is {}”,typesAnnotatedWith); } } 終於我們最簡單的 RPC 框架就開發完了。下面可以測試一下。

  1. Demo api

@RpcInterface public interface IHelloService { String sayHi(String name); } server

IHelloSerivce 的實現:

@Service @Slf4j public class TestServiceImpl implements IHelloService { @Override public String sayHi(String name) { log.info(name); return "Hello " + name; } } 啟動服務:

@SpringBootApplication public class Application { public static void main(String[] args) throws InterruptedException { ConfigurableApplicationContext context = SpringApplication.run(Application.class); TcpService tcpService = context.getBean(TcpService.class); tcpService.start(); } } ` client

@SpringBootApplication() public class ClientApplication { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(ClientApplication.class); IHelloService helloService = context.getBean(IHelloService.class); System.out.println(helloService.sayHi(“doudou”)); } } 執行以後輸出的結果:

Hello doudou

總結 終於我們實現了一個最簡版的 RPC 遠端呼叫的模組。