1. 程式人生 > >一起寫RPC框架(十一)RPC服務提供端三--服務的呼叫

一起寫RPC框架(十一)RPC服務提供端三--服務的呼叫

上一個小節簡單的介紹了服務提供者端如何去編制一個服務的資訊,然後將此服務的資訊傳送到註冊中心上去的基本過程了,其實算是比較簡單的,這節我們將簡單的介紹一些Consumer端呼叫Provider端的時候,Provider端是如何處理的

我們先確定一下遠端呼叫的幾個引數:


首先先簡單地說明這幾個引數

1)invokeId,雖然在我這個RPC中並沒有實現多大的價值,但這是我的能力的原因,但這個引數卻是不可或缺的,因為有過遠端呼叫使用經歷的人,在線上出問題的時候,最最痛苦的就是定位問題了,而一般企業的業務都是鏈式呼叫,A系統呼叫B,B呼叫C,C呼叫D,E,往往排查問題的時候,某某介面呼叫不同的時候,都不知道哪裡出問題了,需要一步步地找相關的負責人一一確認,這個過程是很痛苦的,很有可能是跨部門的,不好協調,一個問題一個bug能需要半天的時間才能解決,這就是普通RPC所缺少的鏈路監控的功能,加入在這種鏈式呼叫的場景下,所有的呼叫日記,能夠按照這個invokeId做歸類的話,不管是用ElasticSearch還是Hbase,Mysql等等記錄方法,加上索引,有個監控系統,這樣就可以很簡單的找出問題的所在,是哪個環節出了問題了,這樣可以大大的加快排查問題的速度,很多S級網際網路公司都實現了這個功能,本人暫時沒有研究過,不過現在已經有很多開源了,大家可以自主調研,一起學習

2)serviceName,這個很好理解,在上幾個小節我們說過自定義Annotation綁定了某個具體的方法,所以一個serviceName是繫結一個方法的,獲取到serviceName,我們可以確定唯一的方法,因為遠端呼叫的本質還是呼叫某個方法

3)args,這個不用多說,呼叫方法的入參

4)timestamp呼叫的時間戳,這個時間應該在呼叫端的時候就形成了,一個遠端呼叫的時間統計應該是從請求發出和接收到響應,這個時間應該算是一個完整的呼叫流程

我們看具體的呼叫程式碼:

public void handlerRPCRequest(RemotingTransporter request, Channel channel) {
		
		
		String serviceName = null;
		RequestCustomBody body = null;
		int requestSize = 0;

		try {
			byte[] bytes = request.bytes();
			requestSize = bytes.length;
			request.bytes(null);

			body = serializerImpl().readObject(bytes, RequestCustomBody.class);
			
			request.setCustomHeader(body);
			serviceName = body.getServiceName();
			
			
			ServiceMeterManager.incrementCallTimes(serviceName);
			ServiceMeterManager.incrementRequestSize(serviceName, requestSize);
			
		} catch (Exception e) {
			rejected(BAD_REQUEST, channel, request,serviceName);
			return;
		}
		
		final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);
		if (pair == null || pair.getValue() == null) {
            rejected(SERVICE_NOT_FOUND, channel, request,serviceName);
            return;
        }
		
		// app flow control
        ServiceFlowControllerManager serviceFlowControllerManager = defaultProvider.getProviderController().getServiceFlowControllerManager();
        if (!serviceFlowControllerManager.isAllow(serviceName)) {
            rejected(APP_FLOW_CONTROL,channel, request,serviceName);
            return;
        }
        
        process(pair,request,channel,serviceName,body.getTimestamp());
	}
入參request中的bytes位元組數就是真正的請求體,我們對其進行反序列化獲取到真正的請求正文:
body = serializerImpl().readObject(bytes, RequestCustomBody.class);
反序列化得到的body裡面有serviceName,我們再根據serviceName獲取到真正的方法名:
final Pair<CurrentServiceState, ServiceWrapper> pair = defaultProvider.getProviderController().getProviderContainer().lookupService(serviceName);
我們在上個小節編織服務的時候,對ServiceWrapper進行了注入,知道serviceName與ServiceWrapper是一一對應的,lookupService方法也很簡單:


在DefaultServiceProviderContainer.java中維護了一個全域性變數的Map類,Key是serviceName,value是一個Pair的鍵值對,鍵值對的鍵值是CurrentServiceState.java

這個類表示的是當前服務例項,是對當前例項狀態的說明:

value是我們有方法名的ServiceWrapper的類,這個類以前有過說明,這邊做個簡單的截圖:


現在已經拿到了方法名,入參,接下來就是呼叫一些反射的API,就可以完成了方法的呼叫了:

private void process(Pair<CurrentServiceState, ServiceWrapper> pair, final RemotingTransporter request, Channel channel,final String serviceName,final long beginTime) {
		
		Object invokeResult = null;
		
		CurrentServiceState currentServiceState = pair.getKey();
		ServiceWrapper serviceWrapper = pair.getValue();
		
		Object targetCallObj = serviceWrapper.getServiceProvider();
		
		Object[] args = ((RequestCustomBody)request.getCustomHeader()).getArgs();
		
		if(currentServiceState.getHasDegrade().get() && serviceWrapper.getMockDegradeServiceProvider() != null){
			targetCallObj = serviceWrapper.getMockDegradeServiceProvider();
		}
		
		String methodName = serviceWrapper.getMethodName();
		List<Class<?>[]> parameterTypesList = serviceWrapper.getParamters();
		
		
		Class<?>[] parameterTypes = findMatchingParameterTypes(parameterTypesList, args);
		invokeResult = fastInvoke(targetCallObj, methodName, parameterTypes, args);
		
		ResultWrapper result = new ResultWrapper();
		result.setResult(invokeResult);
		ResponseCustomBody body = new ResponseCustomBody(Status.OK.value(), result);
		
		final RemotingTransporter response = RemotingTransporter.createResponseTransporter(LaopopoProtocol.RPC_RESPONSE, body, request.getOpaque());
		
		channel.writeAndFlush(response).addListener(new ChannelFutureListener() {

			public void operationComplete(ChannelFuture future) throws Exception {
				
				long elapsed = SystemClock.millisClock().now() - beginTime;
				
				logger.info("call time is [{}]  and minus [{}]",beginTime,elapsed);
				if (future.isSuccess()) {
					
					ServiceMeterManager.incrementTotalTime(serviceName, elapsed);
				} else {
					logger.info("request {} get failed response {}", request, response);
				}
			}
		});
		
	}
這邊的程式碼也很好理解,獲取到引數,校驗引數的格式的正確性,然後去呼叫一些反射和Cglib的一些API就可以搞定了
 public static Object fastInvoke(Object obj, String methodName, Class<?>[] parameterTypes, Object[] args) {
        Class<?> clazz = obj.getClass();
        FastClass fastClass = fastClassCache.get(clazz);
        if (fastClass == null) {
            FastClass newFastClass = FastClass.create(clazz);
            fastClass = fastClassCache.putIfAbsent(clazz, newFastClass);
            if (fastClass == null) {
                fastClass = newFastClass;
            }
        }

        Object value = null;
        try {
            value = fastClass.invoke(methodName, parameterTypes, obj, args);
        } catch (InvocationTargetException e) {
            JUnsafe.throwException(e);
            
        }
        return value;
    }
獲取到呼叫的返回值之後,然後將其編織成返回值,然後用傳遞過來的channel將響應值返回到呼叫的Consumer的例項,整個流程基本是這樣的,大體上的呼叫流程就是這樣的,希望大家一起看程式碼,看看是否有bug,大家一起修正~

原始碼地址:

https://github.com/BazingaLyn/laopopo-rpc/blob/master/laopopo-client/src/main/java/org/laopopo/client/provider/ProviderRPCController.java

下一節,我們看看如何做簡單的限流的