1. 程式人生 > >動態代理在RPC框架中應用

動態代理在RPC框架中應用

例項

1.第一個例項取自黃勇的輕量級分散式 RPC 框架demo(https://gitee.com/huangyong/rpc) ,由於實現中通訊框架使用了Netty,所以在分析中會有部分Netty程式碼的資訊,不過不用擔心,即使不懂Netty,講解的過程中會盡量避免,並會突出反射與動態代理在其中的作用。
在rpc-simple-client中HelloClient.Class有如下程式碼:

HelloService helloService = rpcProxy.create(HelloService.class);
String result = helloService.hello("World");
System.out.println(result);

這個程式碼做的是什麼事呢?通過一個代理生成helloService物件,執行hello方法。 
在我們印象中執行方法,最終都會執行的是介面中實現的方法。那事實是這樣嗎?看下面的分析。 
在rpcProxy程式碼如下:

public <T> T create(final Class<?> interfaceClass, final String serviceVersion) {
        // 建立動態代理物件
        return (T) Proxy.newProxyInstance(
                interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 建立 RPC 請求物件並設定請求屬性
                        RpcRequest request = new RpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(method.getDeclaringClass().getName());
                        request.setServiceVersion(serviceVersion);
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);
                        // 獲取 RPC 服務地址
                        if (serviceDiscovery != null) {
                            String serviceName = interfaceClass.getName();
                            if (StringUtil.isNotEmpty(serviceVersion)) {
                                serviceName += "-" + serviceVersion;
                            }
                            serviceAddress = serviceDiscovery.discover(serviceName);
                            LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress);
                        }
                        if (StringUtil.isEmpty(serviceAddress)) {
                            throw new RuntimeException("server address is empty");
                        }
                        // 從 RPC 服務地址中解析主機名與埠號
                        String[] array = StringUtil.split(serviceAddress, ":");
                        String host = array[0];
                        int port = Integer.parseInt(array[1]);
                        // 建立 RPC 客戶端物件併發送 RPC 請求
                        RpcClient client = new RpcClient(host, port);
                        long time = System.currentTimeMillis();
                        RpcResponse response = client.send(request);
                        LOGGER.debug("time: {}ms", System.currentTimeMillis() - time);
                        if (response == null) {
                            throw new RuntimeException("response is null");
                        }
                        // 返回 RPC 響應結果
                        if (response.hasException()) {
                            throw response.getException();
                        } else {
                            return response.getResult();
                        }
                    }
                }
        );
    }

從上面的程式碼可以看出經過了代理,執行hello方法,其實是發起一個請求。既然是一個請求,就是要涉及Client端與Server端,上面其實是一個Clent端程式碼。
那我們看看Server做了什麼,去掉一個和本文所介紹不相關的程式碼,在RpcServerHandler中可以看核心程式碼如下:

public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception {
   Object result = handle(request);
   response.setResult(result);
   ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);  //返回,非同步關閉連線
}
  其中hanlde中重要實現如下
       // 獲取反射呼叫所需的引數,這些都是Client端傳輸給我們的。
  Class<?> serviceClass = serviceBean.getClass();
  String methodName = request.getMethodName();
  Class<?>[] parameterTypes = request.getParameterTypes();
  Object[] parameters = request.getParameters();
   // 使用 CGLib 執行反射呼叫
   FastClass serviceFastClass = FastClass.create(serviceClass);
   FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
   return serviceFastMethod.invoke(serviceBean, parameters);

2.第二個例項取自xxl-job分散式任務排程平臺 
說明:此開源專案的,RPC通訊是用Jetty來實現的。

在xxl-job-admin中XxlJobTrigger.Class的runExecutor有如下:

ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address);  //根據地址拿到執行器
  runResult = executorBiz.run(triggerParam);

做了很簡單的是取出執行器,觸發執行。但是進入getExecutorBiz方法你會發現如下:

executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address, 
                           accessToken).getObject();
  executorBizRepository.put(address, executorBiz);
  return executorBiz;

是不是很熟悉,沒錯,動態代理,看是NetComClientProxy的實現:
在結構上是不是和第一個例項中的rpcProxy程式碼,很相似呢。
new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();做了什麼呢?

public Object getObject() throws Exception {
        return Proxy.newProxyInstance(Thread.currentThread()
                .getContextClassLoader(), new Class[] { iface },
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

                        // request封裝
                        RpcRequest request = new RpcRequest();
                        request.setServerAddress(serverAddress);
                        request.setCreateMillisTime(System.currentTimeMillis());
                        request.setAccessToken(accessToken);
                        request.setClassName(method.getDeclaringClass().getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(args);

                        // send傳送
                        RpcResponse response = client.send(request);

                        // valid response
                        if (response == null) {
                            logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
                            throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
                        }
                        if (response.isError()) {
                            throw new RuntimeException(response.getError());
                        } else {
                            return response.getResult();
                        }

                    }
                });
    }

依舊是封裝了一個RpcRequest ,傳送請求。所以在 runResult = executorBiz.run(triggerParam)
其實是在傳送一個請求。上面是Client端程式碼,照舊,接著看Server程式碼,你會發現還是似成相識。去掉與本文無關的程式碼,得到如下:
在xxl-job-core中JettyServerHandler.Class有 :

RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
點選進入:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class<?> serviceClass = serviceBean.getClass();  //類名
            String methodName = request.getMethodName();    //方法名run
            Class<?>[] parameterTypes = request.getParameterTypes();  //引數型別
            Object[] parameters = request.getParameters();   //具體引數

            FastClass serviceFastClass = FastClass.create(serviceClass);
            FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
            // 使用 CGLib 執行反射呼叫
            Object result = serviceFastMethod.invoke(serviceBean, parameters);
            response.setResult(result);
        } catch (Throwable t) {
            t.printStackTrace();
            response.setError(t.getMessage());
        }
        return response;
}

根據反射生成具體的類,來執行相關的方法,達到想要的目的。 
上面兩個例項的過程可以用下圖概括: 
具體過程.png

 

RPC,遠端過程呼叫。就是呼叫遠端機器上的方法。

原理其實很簡單,就是客戶端上執行的程式在呼叫物件方法時,底層將針對該方法的呼叫

/**
 * 封裝 RPC 請求
 *
 * @author huangyong
 * @since 1.0.0
 */
public class RpcRequest {

    private String requestId;//請求的id
    private String interfaceName;//目標方法實現了哪些介面
    private String serviceVersion;
    private String methodName;//目標方法名
    private Class<?>[] parameterTypes;//目標方法的入參型別
    private Object[] parameters;//目標方法的入參

將其作為TCP/HTTP請求的引數傳送遠端伺服器,遠端伺服器監聽固定埠,收到這個TCP/HTTP請求後會解析出相關資訊,即:根據client端傳過來的資料反射呼叫服務端的方法,包括客戶端想要呼叫哪個類的哪個方法,引數是什麼等,然後進行對應的呼叫,將呼叫結果再通過資料包發回即可。