動態代理在RPC框架中應用
例項
1.第一個例項取自黃勇的輕量級分散式 RPC 框架demo(https://gitee.com/huangyong/rpc) ,由於實現中通訊框架使用了Netty,所以在分析中會有部分Netty程式碼的資訊,不過不用擔心,即使不懂Netty,講解的過程中會盡量避免,並會突出反射與動態代理在其中的作用。
在rpc-simple-client中HelloClient.Class有如下程式碼:
HelloService helloService = rpcProxy.create(HelloService.class); String result = helloService.hello("World"); System.out.println(result);
這個程式碼做的是什麼事呢?通過一個代理生成helloService物件,執行hello方法。
在我們印象中執行方法,最終都會執行的是介面中實現的方法。那事實是這樣嗎?看下面的分析。
在rpcProxy程式碼如下:
public <T> T create(final Class<?> interfaceClass, final String serviceVersion) { // 建立動態代理物件 return (T) Proxy.newProxyInstance( interfaceClass.getClassLoader(), new Class<?>[]{interfaceClass}, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // 建立 RPC 請求物件並設定請求屬性 RpcRequest request = new RpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(method.getDeclaringClass().getName()); request.setServiceVersion(serviceVersion); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(args); // 獲取 RPC 服務地址 if (serviceDiscovery != null) { String serviceName = interfaceClass.getName(); if (StringUtil.isNotEmpty(serviceVersion)) { serviceName += "-" + serviceVersion; } serviceAddress = serviceDiscovery.discover(serviceName); LOGGER.debug("discover service: {} => {}", serviceName, serviceAddress); } if (StringUtil.isEmpty(serviceAddress)) { throw new RuntimeException("server address is empty"); } // 從 RPC 服務地址中解析主機名與埠號 String[] array = StringUtil.split(serviceAddress, ":"); String host = array[0]; int port = Integer.parseInt(array[1]); // 建立 RPC 客戶端物件併發送 RPC 請求 RpcClient client = new RpcClient(host, port); long time = System.currentTimeMillis(); RpcResponse response = client.send(request); LOGGER.debug("time: {}ms", System.currentTimeMillis() - time); if (response == null) { throw new RuntimeException("response is null"); } // 返回 RPC 響應結果 if (response.hasException()) { throw response.getException(); } else { return response.getResult(); } } } ); }
從上面的程式碼可以看出經過了代理,執行hello方法,其實是發起一個請求。既然是一個請求,就是要涉及Client端與Server端,上面其實是一個Clent端程式碼。
那我們看看Server做了什麼,去掉一個和本文所介紹不相關的程式碼,在RpcServerHandler中可以看核心程式碼如下:
public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { Object result = handle(request); response.setResult(result); ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); //返回,非同步關閉連線 } 其中hanlde中重要實現如下 // 獲取反射呼叫所需的引數,這些都是Client端傳輸給我們的。 Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); // 使用 CGLib 執行反射呼叫 FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters);
2.第二個例項取自xxl-job分散式任務排程平臺
說明:此開源專案的,RPC通訊是用Jetty來實現的。
在xxl-job-admin中XxlJobTrigger.Class的runExecutor有如下:
ExecutorBiz executorBiz = XxlJobDynamicScheduler.getExecutorBiz(address); //根據地址拿到執行器
runResult = executorBiz.run(triggerParam);
做了很簡單的是取出執行器,觸發執行。但是進入getExecutorBiz方法你會發現如下:
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address,
accessToken).getObject();
executorBizRepository.put(address, executorBiz);
return executorBiz;
是不是很熟悉,沒錯,動態代理,看是NetComClientProxy的實現:
在結構上是不是和第一個例項中的rpcProxy程式碼,很相似呢。
new NetComClientProxy(ExecutorBiz.class, address, accessToken).getObject();做了什麼呢?
public Object getObject() throws Exception {
return Proxy.newProxyInstance(Thread.currentThread()
.getContextClassLoader(), new Class[] { iface },
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// request封裝
RpcRequest request = new RpcRequest();
request.setServerAddress(serverAddress);
request.setCreateMillisTime(System.currentTimeMillis());
request.setAccessToken(accessToken);
request.setClassName(method.getDeclaringClass().getName());
request.setMethodName(method.getName());
request.setParameterTypes(method.getParameterTypes());
request.setParameters(args);
// send傳送
RpcResponse response = client.send(request);
// valid response
if (response == null) {
logger.error(">>>>>>>>>>> xxl-rpc netty response not found.");
throw new Exception(">>>>>>>>>>> xxl-rpc netty response not found.");
}
if (response.isError()) {
throw new RuntimeException(response.getError());
} else {
return response.getResult();
}
}
});
}
依舊是封裝了一個RpcRequest ,傳送請求。所以在 runResult = executorBiz.run(triggerParam)
其實是在傳送一個請求。上面是Client端程式碼,照舊,接著看Server程式碼,你會發現還是似成相識。去掉與本文無關的程式碼,得到如下:
在xxl-job-core中JettyServerHandler.Class有 :
RpcResponse rpcResponse = NetComServerFactory.invokeService(rpcRequest, null);
點選進入:
public static RpcResponse invokeService(RpcRequest request, Object serviceBean) {
Class<?> serviceClass = serviceBean.getClass(); //類名
String methodName = request.getMethodName(); //方法名run
Class<?>[] parameterTypes = request.getParameterTypes(); //引數型別
Object[] parameters = request.getParameters(); //具體引數
FastClass serviceFastClass = FastClass.create(serviceClass);
FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes);
// 使用 CGLib 執行反射呼叫
Object result = serviceFastMethod.invoke(serviceBean, parameters);
response.setResult(result);
} catch (Throwable t) {
t.printStackTrace();
response.setError(t.getMessage());
}
return response;
}
根據反射生成具體的類,來執行相關的方法,達到想要的目的。
上面兩個例項的過程可以用下圖概括:
RPC,遠端過程呼叫。就是呼叫遠端機器上的方法。
原理其實很簡單,就是客戶端上執行的程式在呼叫物件方法時,底層將針對該方法的呼叫:
/** * 封裝 RPC 請求 * * @author huangyong * @since 1.0.0 */ public class RpcRequest { private String requestId;//請求的id private String interfaceName;//目標方法實現了哪些介面 private String serviceVersion; private String methodName;//目標方法名 private Class<?>[] parameterTypes;//目標方法的入參型別 private Object[] parameters;//目標方法的入參
將其作為TCP/HTTP請求的引數傳送遠端伺服器,遠端伺服器監聽固定埠,收到這個TCP/HTTP請求後會解析出相關資訊,即:根據client端傳過來的資料反射呼叫服務端的方法,包括客戶端想要呼叫哪個類的哪個方法,引數是什麼等,然後進行對應的呼叫,將呼叫結果再通過資料包發回即可。