1. 程式人生 > >基於Netty的RPC簡單框架實現(二):RPC服務端







package com.maigo.rpc.context;

public class RpcRequest 
	int id;
	String methodName;
	Object[] args;
	public RpcRequest(int id, String methodName, Object[] args) 
		this.id = id;
		this.methodName = methodName;
		this.args = args;

	public int getId() 
		return id;

	public String getMethodName() 
		return methodName;

	public Object[] getArgs() 
		return args;



package com.maigo.rpc.server;

import com.maigo.rpc.aop.RpcInvokeHook;

public class RpcServerBuilder 
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private int port;	
	private int threads;
	private RpcInvokeHook rpcInvokeHook;
	public static RpcServerBuilder create()
		return new RpcServerBuilder();
	 * set the interface to provide service
	 * @param interfaceClass
	public RpcServerBuilder serviceInterface(Class<?> interfaceClass)
		this.interfaceClass = interfaceClass;
		return this;
	 * set the real object to provide service
	public RpcServerBuilder serviceProvider(Object serviceProvider)
		this.serviceProvider = serviceProvider;
		return this;
	 * set the port to bind
	public RpcServerBuilder bind(int port)
		this.port = port;
		return this;
	 * set the count of threads to handle request from client. (default availableProcessors)
	public RpcServerBuilder threads(int threadCount)
		this.threads = threadCount;	
		return this;
	 * set the hook of the method invoke in server
	public RpcServerBuilder hook(RpcInvokeHook rpcInvokeHook)
		this.rpcInvokeHook = rpcInvokeHook;	
		return this;
	public RpcServer build()
		if(threads <= 0)
			threads = Runtime.getRuntime().availableProcessors();
		RpcServer rpcServer = new RpcServer(interfaceClass, serviceProvider, port, 
				threads, rpcInvokeHook);
		return rpcServer;


package com.maigo.rpc.server;

import java.util.concurrent.atomic.AtomicInteger;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServer 
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private int port;	
	private int threads;
	private RpcInvokeHook rpcInvokeHook;

	private RpcServerRequestHandler rpcServerRequestHandler;
	protected RpcServer(Class<?> interfaceClass, Object serviceProvider, int port, int threads,
			RpcInvokeHook rpcInvokeHook) 
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.port = port;
		this.threads = threads;
		this.rpcInvokeHook = rpcInvokeHook;
		rpcServerRequestHandler = new RpcServerRequestHandler(interfaceClass, 
				serviceProvider, threads, rpcInvokeHook);
	public void start()
		System.out.println("bind port:"+port + " success!");
		//simulation for receive RpcRequest
		AtomicInteger idGenerator = new AtomicInteger(0);
		for(int i=0; i<10; i++)
			rpcServerRequestHandler.addRequest(new RpcRequest(idGenerator.addAndGet(1), 
					"testMethod01", new Object[]{"qwerty"}));
	public void stop()
		//TODO add stop codes here
		System.out.println("server stop success!");



package com.maigo.rpc.server;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServerRequestHandler 
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private RpcInvokeHook rpcInvokeHook;
	private int threads;
	private ExecutorService threadPool;
	private BlockingQueue<RpcRequest> requestQueue = new LinkedBlockingQueue<RpcRequest>();
	public RpcServerRequestHandler(Class<?> interfaceClass,	Object serviceProvider, int threads,
			RpcInvokeHook rpcInvokeHook) 
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.threads = threads;
		this.rpcInvokeHook = rpcInvokeHook;

	public void start()
		threadPool = Executors.newFixedThreadPool(threads);
		for(int i=0; i<threads; i++)
			threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass, 
					serviceProvider, rpcInvokeHook, requestQueue));
	public void addRequest(RpcRequest rpcRequest)
		catch (InterruptedException e) 



package com.maigo.rpc.server;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;

import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;

public class RpcServerRequestHandleRunnable implements Runnable
	private Class<?> interfaceClass;
	private Object serviceProvider;
	private RpcInvokeHook rpcInvokeHook;
	private BlockingQueue<RpcRequest> requestQueue;
	public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
			Object serviceProvider, RpcInvokeHook rpcInvokeHook, 
			BlockingQueue<RpcRequest> requestQueue) 
		this.interfaceClass = interfaceClass;
		this.serviceProvider = serviceProvider;
		this.rpcInvokeHook = rpcInvokeHook;
		this.requestQueue = requestQueue;

	public void run() 
				RpcRequest rpcRequest = requestQueue.take();
				String methodName = rpcRequest.getMethodName();	
				Object[] args = rpcRequest.getArgs();
				int parameterCount = args.length;
				Method method = null;
				if(parameterCount > 0)
					Class<?>[] parameterTypes = new Class[args.length];
					for(int i=0; i<parameterCount; i++)
						parameterTypes[i] = args[i].getClass();
					method = interfaceClass.getMethod(methodName, parameterTypes);
					method = interfaceClass.getMethod(methodName);
				if(rpcInvokeHook != null)
					rpcInvokeHook.beforeInvoke(methodName, args);
				Object result = method.invoke(serviceProvider, args);
				System.out.println("Send response id = " + rpcRequest.getId() + " result = " + result 
						+ " back to client. " + Thread.currentThread());
				if(rpcInvokeHook != null)
					rpcInvokeHook.afterInvoke(methodName, args);
			catch (InterruptedException e) 
			catch (NoSuchMethodException e) 
				// TODO return NoSuchMethodException to client	
			catch (SecurityException e) 
			catch (IllegalAccessException e) 
				// TODO return IllegalAccessException to client	
			catch (IllegalArgumentException e) 
				// TODO return IllegalArgumentException to client	
			catch (InvocationTargetException e) 
				// TODO return Exception to client	



TestInterface testInterface = new TestInterface() 
	public String testMethod01(String string) 
		return string.toUpperCase();
RpcInvokeHook hook = new RpcInvokeHook() 
	public void beforeInvoke(String methodName, Object[] args) 
		System.out.println("beforeInvoke " + methodName);
	public void afterInvoke(String methodName, Object[] args) 
		System.out.println("afterInvoke " + methodName);
RpcServer rpcServer = RpcServerBuilder.create()
bind port:3721 success!
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 2 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
Send response id = 4 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
Send response id = 1 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 3 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01
afterInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
Send response id = 5 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 6 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 10 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
afterInvoke testMethod01
Send response id = 9 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
afterInvoke testMethod01
Send response id = 7 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
afterInvoke testMethod01
Send response id = 8 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01