一起寫RPC框架(十一)RPC服務提供端三--服務的呼叫
上一個小節簡單的介紹了服務提供者端如何去編制一個服務的資訊,然後將此服務的資訊傳送到註冊中心上去的基本過程了,其實算是比較簡單的,這節我們將簡單的介紹一些Consumer端呼叫Provider端的時候,Provider端是如何處理的
我們先確定一下遠端呼叫的幾個引數:
首先先簡單地說明這幾個引數
1)invokeId,雖然在我這個RPC中並沒有實現多大的價值,但這是我的能力的原因,但這個引數卻是不可或缺的,因為有過遠端呼叫使用經歷的人,在線上出問題的時候,最最痛苦的就是定位問題了,而一般企業的業務都是鏈式呼叫,A系統呼叫B,B呼叫C,C呼叫D,E,往往排查問題的時候,某某介面呼叫不同的時候,都不知道哪裡出問題了,需要一步步地找相關的負責人一一確認,這個過程是很痛苦的,很有可能是跨部門的,不好協調,一個問題一個bug能需要半天的時間才能解決,這就是普通RPC所缺少的鏈路監控的功能,加入在這種鏈式呼叫的場景下,所有的呼叫日記,能夠按照這個invokeId做歸類的話,不管是用ElasticSearch還是Hbase,Mysql等等記錄方法,加上索引,有個監控系統,這樣就可以很簡單的找出問題的所在,是哪個環節出了問題了,這樣可以大大的加快排查問題的速度,很多S級網際網路公司都實現了這個功能,本人暫時沒有研究過,不過現在已經有很多開源了,大家可以自主調研,一起學習
2)serviceName,這個很好理解,在上幾個小節我們說過自定義Annotation綁定了某個具體的方法,所以一個serviceName是繫結一個方法的,獲取到serviceName,我們可以確定唯一的方法,因為遠端呼叫的本質還是呼叫某個方法
3)args,這個不用多說,呼叫方法的入參
4)timestamp呼叫的時間戳,這個時間應該在呼叫端的時候就形成了,一個遠端呼叫的時間統計應該是從請求發出和接收到響應,這個時間應該算是一個完整的呼叫流程
我們看具體的呼叫程式碼:
入參request中的bytes位元組數就是真正的請求體,我們對其進行反序列化獲取到真正的請求正文:public void handlerRPCRequest(RemotingTransporter request, Channel channel) { String serviceName = null; RequestCustomBody body = null; int requestSize = 0; try { byte[] bytes = request.bytes(); requestSize = bytes.length; request.bytes(null); body = serializerImpl().readObject(bytes, RequestCustomBody.class); request.setCustomHeader(body); serviceName = body.getServiceName(); ServiceMeterManager.incrementCallTimes(serviceName); ServiceMeterManager.incrementRequestSize(serviceName, requestSize); } catch (Exception e) { rejected(BAD_REQUEST, channel, request,serviceName); return; } final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName); if (pair == null || pair.getValue() == null) { rejected(SERVICE_NOT_FOUND, channel, request,serviceName); return; } // app flow control ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderController().getServiceFlowControllerManager(); if (!serviceFlowControllerManager.isAllow(serviceName)) { rejected(APP_FLOW_CONTROL,channel, request,serviceName); return; } process(pair,request,channel,serviceName,body.getTimestamp()); }
body = serializerImpl().readObject(bytes, RequestCustomBody.class);
反序列化得到的body裡面有serviceName,我們再根據serviceName獲取到真正的方法名:
我們在上個小節編織服務的時候,對ServiceWrapper進行了注入,知道serviceName與ServiceWrapper是一一對應的,lookupService方法也很簡單:final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);
在DefaultServiceProviderContainer.java中維護了一個全域性變數的Map類,Key是serviceName,value是一個Pair的鍵值對,鍵值對的鍵值是CurrentServiceState.java
這個類表示的是當前服務例項,是對當前例項狀態的說明:
value是我們有方法名的ServiceWrapper的類,這個類以前有過說明,這邊做個簡單的截圖:
現在已經拿到了方法名,入參,接下來就是呼叫一些反射的API,就可以完成了方法的呼叫了:
private void process(Pair<CurrentServiceState, ServiceWrapper> pair, final RemotingTransporter request, Channel channel,final String serviceName,final long beginTime) {
Object invokeResult = null;
CurrentServiceState currentServiceState = pair.getKey();
ServiceWrapper serviceWrapper = pair.getValue();
Object targetCallObj = serviceWrapper.getServiceProvider();
Object[] args = ((RequestCustomBody)request.getCustomHeader()).getArgs();
if(currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null){
targetCallObj = serviceWrapper.getMockDegradeServiceProvider();
}
String methodName = serviceWrapper.getMethodName();
List<Class<?>[]> parameterTypesList = serviceWrapper.getParamters();
Class<?>[] parameterTypes = findMatchingParameterTypes(parameterTypesList, args);
invokeResult = fastInvoke(targetCallObj, methodName, parameterTypes, args);
ResultWrapper result = new ResultWrapper();
result.setResult(invokeResult);
ResponseCustomBody body = new ResponseCustomBody(Status.OK.value(), result);
final RemotingTransporter response = RemotingTransporter.createResponseTransporter(LaopopoProtocol.RPC_RESPONSE, body, request.getOpaque());
channel.writeAndFlush(response).addListener(new ChannelFutureListener() {
public void operationComplete(ChannelFuture future) throws Exception {
long elapsed = SystemClock.millisClock().now() - beginTime;
logger.info("call time is [{}] and minus [{}]",beginTime,elapsed);
if (future.isSuccess()) {
ServiceMeterManager.incrementTotalTime(serviceName, elapsed);
} else {
logger.info("request {} get failed response {}", request, response);
}
}
});
}
這邊的程式碼也很好理解,獲取到引數,校驗引數的格式的正確性,然後去呼叫一些反射和Cglib的一些API就可以搞定了
public static Object fastInvoke(Object obj, String methodName, Class<?>[] parameterTypes, Object[] args) {
Class<?> clazz = obj.getClass();
FastClass fastClass = fastClassCache.get(clazz);
if (fastClass == null) {
FastClass newFastClass = FastClass.create(clazz);
fastClass = fastClassCache.putIfAbsent(clazz, newFastClass);
if (fastClass == null) {
fastClass = newFastClass;
}
}
Object value = null;
try {
value = fastClass.invoke(methodName, parameterTypes, obj, args);
} catch (InvocationTargetException e) {
JUnsafe.throwException(e);
}
return value;
}
獲取到呼叫的返回值之後,然後將其編織成返回值,然後用傳遞過來的channel將響應值返回到呼叫的Consumer的例項,整個流程基本是這樣的,大體上的呼叫流程就是這樣的,希望大家一起看程式碼,看看是否有bug,大家一起修正~原始碼地址:
https://github.com/BazingaLyn/laopopo-rpc/blob/master/laopopo-client/src/main/java/org/laopopo/client/provider/ProviderRPCController.java
下一節,我們看看如何做簡單的限流的