1. 程式人生 > >基於Netty的RPC簡單框架實現(二):RPC服務端

基於Netty的RPC簡單框架實現(二):RPC服務端

1.RPC服務端的實現思路

相對於客戶端而言,服務端要簡單不少。基本思想就是,建立RPC服務端的時候,建立一個RPC請求佇列和一定數量的Handler執行緒。Handler執行緒都持有服務端提供服務的Interface的類型別和實際供方法呼叫的物件(實現了提供服務的Interface),各執行緒只需要不斷從RPC請求佇列中取出請求,然後用供方法呼叫的物件來呼叫所請求的方法,最後將呼叫的結果通過Netty傳送回客戶端即可。

2.RPC服務端的具體實現

(1).RpcRequest

在具體實現RPC服務端之前,先定義RpcRequest類。

package com.maigo.rpc.context;

public class RpcRequest 
{
	int id;
	String methodName;
	Object[] args;
	
	public RpcRequest(int id, String methodName, Object[] args) 
	{
		this.id = id;
		this.methodName = methodName;
		this.args = args;
	}

	public int getId() 
	{
		return id;
	}

	public String getMethodName() 
	{
		return methodName;
	}

	public Object[] getArgs() 
	{
		return args;
	}
}
RpcRequest表示了一個RPC呼叫請求。id用於區分多次不同的呼叫,methodName為請求呼叫的方法名,args為引數。

(2).RpcServerBuilder

RpcServerBuilder是建立RpcServer的工廠類

package com.maigo.rpc.server;

import com.maigo.rpc.aop.RpcInvokeHook;

public class RpcServerBuilder 
{
	private Class<?> interfaceClass;
	private Object serviceProvider;
	
	private int port;	
	private int threads;
	private RpcInvokeHook rpcInvokeHook;
	
	public static RpcServerBuilder create()
	{
		return new RpcServerBuilder();
	}
	
	/**
	 * set the interface to provide service
	 * @param interfaceClass
	 */
	public RpcServerBuilder serviceInterface(Class<?> interfaceClass)
	{
		this.interfaceClass = interfaceClass;
		return this;
	}
	
	/**
	 * set the real object to provide service
	 */
	public RpcServerBuilder serviceProvider(Object serviceProvider)
	{
		this.serviceProvider = serviceProvider;
		return this;
	}
	
	/**
	 * set the port to bind
	 */
	public RpcServerBuilder bind(int port)
	{
		this.port = port;
		return this;
	}
	
	/**
	 * set the count of threads to handle request from client. (default availableProcessors)
	 */
	public RpcServerBuilder threads(int threadCount)
	{
		this.threads = threadCount;	
		return this;
	}
	
	/**
	 * set the hook of the method invoke in server
	 */
	public RpcServerBuilder hook(RpcInvokeHook rpcInvokeHook)
	{
		this.rpcInvokeHook = rpcInvokeHook;	
		return this;
	}
	
	public RpcServer build()
	{
		if(threads <= 0)
			threads = Runtime.getRuntime().availableProcessors();
				
		RpcServer rpcServer = new RpcServer(interfaceClass, serviceProvider, port, 
				threads, rpcInvokeHook);
		
		return rpcServer;
	}
}
API都很簡單,create()建立工場,serviceInterface()設定服務介面,serviceProvider()設定供方法呼叫的實際物件,bind()設定繫結的埠號,threads()設定Handler執行緒的個數(預設為CPU核數),build()創建出RpcServer物件。

(3).RpcServer

package com.maigo.rpc.server;

import java.util.concurrent.atomic.AtomicInteger;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServer 
{
	private Class<?> interfaceClass;
	private Object serviceProvider;
	
	private int port;	
	private int threads;
	private RpcInvokeHook rpcInvokeHook;

	private RpcServerRequestHandler rpcServerRequestHandler;
	
	protected RpcServer(Class<?> interfaceClass, Object serviceProvider, int port, int threads,
			RpcInvokeHook rpcInvokeHook) 
	{	
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.port = port;
		this.threads = threads;
		this.rpcInvokeHook = rpcInvokeHook;
		
		rpcServerRequestHandler = new RpcServerRequestHandler(interfaceClass, 
				serviceProvider, threads, rpcInvokeHook);
		rpcServerRequestHandler.start();
	}	
	
	public void start()
	{
		System.out.println("bind port:"+port + " success!");
		
		//simulation for receive RpcRequest
		AtomicInteger idGenerator = new AtomicInteger(0);
		for(int i=0; i<10; i++)
		{
			rpcServerRequestHandler.addRequest(new RpcRequest(idGenerator.addAndGet(1), 
					"testMethod01", new Object[]{"qwerty"}));
		}
	}
	
	public void stop()
	{
		//TODO add stop codes here
		System.out.println("server stop success!");
	}
}
RpcServer只提供了start()和stop()方法用於啟動和停止RPC服務。由於啟動和停止要涉及網路部分,現在先用列印輸出代替。start()方法中還模擬了收到RpcRequest的情況,用於當前無網路連線的情況下測試。RpcServer的構造方法中建立了一個RpcServerRequestHandler,專門用於處理RpcRequest。


(4).RpcServerRequestHandler

專門用於處理RpcRequest的類

package com.maigo.rpc.server;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServerRequestHandler 
{
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private RpcInvokeHook rpcInvokeHook;
	
	private int threads;
	private ExecutorService threadPool;
	private BlockingQueue<RpcRequest> requestQueue = new LinkedBlockingQueue<RpcRequest>();
	
	public RpcServerRequestHandler(Class<?> interfaceClass,	Object serviceProvider, int threads,
			RpcInvokeHook rpcInvokeHook) 
	{
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.threads = threads;
		this.rpcInvokeHook = rpcInvokeHook;
	}

	public void start()
	{
		threadPool = Executors.newFixedThreadPool(threads);
		for(int i=0; i<threads; i++)
		{
			threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass, 
					serviceProvider, rpcInvokeHook, requestQueue));
		}
	}
	
	public void addRequest(RpcRequest rpcRequest)
	{
		try 
		{
			requestQueue.put(rpcRequest);
		} 
		catch (InterruptedException e) 
		{
			e.printStackTrace();
		}
	}
}
RpcServerRequestHandler的構造方法中,建立了1個大小為threads的執行緒池,並讓其運行了threads個RpcServerRequestHandleRunnable。每個RpcServerRequestHandleRunnable持有相同的服務介面interfaceClass表示服務端提供哪些服務,相同的服務提供物件serviceProvider供實際方法呼叫,相同的請求佇列requestQueue用於取出收到的方法呼叫請求。

(5).RpcServerRequestHandleRunnable

方法呼叫請求的實際執行者

package com.maigo.rpc.server;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServerRequestHandleRunnable implements Runnable
{
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private RpcInvokeHook rpcInvokeHook;
	private BlockingQueue<RpcRequest> requestQueue;
		
	public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
			Object serviceProvider, RpcInvokeHook rpcInvokeHook, 
			BlockingQueue<RpcRequest> requestQueue) 
	{
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.rpcInvokeHook = rpcInvokeHook;
		this.requestQueue = requestQueue;
	}

	public void run() 
	{	
		while(true)
		{
			try 
			{
				RpcRequest rpcRequest = requestQueue.take();
				
				String methodName = rpcRequest.getMethodName();	
				Object[] args = rpcRequest.getArgs();
								
				int parameterCount = args.length;
				Method method = null;
				if(parameterCount > 0)
				{
					Class<?>[] parameterTypes = new Class[args.length];
					for(int i=0; i<parameterCount; i++)
					{
						parameterTypes[i] = args[i].getClass();
					}
					method = interfaceClass.getMethod(methodName, parameterTypes);
				}
				else
				{
					method = interfaceClass.getMethod(methodName);
				}
					
				if(rpcInvokeHook != null)
					rpcInvokeHook.beforeInvoke(methodName, args);
				
				Object result = method.invoke(serviceProvider, args);
				System.out.println("Send response id = " + rpcRequest.getId() + " result = " + result 
						+ " back to client. " + Thread.currentThread());
				
				if(rpcInvokeHook != null)
					rpcInvokeHook.afterInvoke(methodName, args);
			} 
			catch (InterruptedException e) 
			{
				e.printStackTrace();
			} 
			catch (NoSuchMethodException e) 
			{
				// TODO return NoSuchMethodException to client	
				e.printStackTrace();
			} 
			catch (SecurityException e) 
			{
				e.printStackTrace();
			} 
			catch (IllegalAccessException e) 
			{
				// TODO return IllegalAccessException to client	
				e.printStackTrace();
			} 
			catch (IllegalArgumentException e) 
			{
				// TODO return IllegalArgumentException to client	
				e.printStackTrace();
			} 
			catch (InvocationTargetException e) 
			{
				// TODO return Exception to client	
				e.printStackTrace();
			}
		}
	}
}
RpcServerRequestHandleRunnable不斷地從請求佇列requestQueue中取出方法呼叫請求RpcRequest,用serviceProvider呼叫請求的方法並向客戶端返回呼叫結果。由於現在還未加入網路部分,向客戶端返回結果暫時先用列印輸出代替。在方法實際呼叫的前後,鉤子Hook的回撥得到了執行。

3.測試

RpcServer中的start()方法模擬了收到10個呼叫請求的情況

TestInterface testInterface = new TestInterface() 
{			
	public String testMethod01(String string) 
	{
		return string.toUpperCase();
	}
};
	
RpcInvokeHook hook = new RpcInvokeHook() 
{			
	public void beforeInvoke(String methodName, Object[] args) 
	{
		System.out.println("beforeInvoke " + methodName);
	}
	
	public void afterInvoke(String methodName, Object[] args) 
	{
		System.out.println("afterInvoke " + methodName);
	}
};
		
RpcServer rpcServer = RpcServerBuilder.create()
                                      .serviceInterface(TestInterface.class)
                                      .serviceProvider(testInterface)
                                      .threads(4)
                                      .hook(hook)
                                      .bind(3721)
                                      .build();
rpcServer.start();
輸出結果為:
bind port:3721 success!
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 2 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
Send response id = 4 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
Send response id = 1 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 3 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01
afterInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
Send response id = 5 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 6 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 10 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
afterInvoke testMethod01
Send response id = 9 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
afterInvoke testMethod01
Send response id = 7 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
afterInvoke testMethod01
Send response id = 8 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01
可見,共有4個Handler執行緒在工作,並且都正確的呼叫了被請求的方法,設定的Hook也受到了正確的回撥。