基於Netty的RPC簡單框架實現(二):RPC服務端
阿新 • • 發佈:2019-01-10
1.RPC服務端的實現思路
相對於客戶端而言,服務端要簡單不少。基本思想就是,建立RPC服務端的時候,建立一個RPC請求佇列和一定數量的Handler執行緒。Handler執行緒都持有服務端提供服務的Interface的類型別和實際供方法呼叫的物件(實現了提供服務的Interface),各執行緒只需要不斷從RPC請求佇列中取出請求,然後用供方法呼叫的物件來呼叫所請求的方法,最後將呼叫的結果通過Netty傳送回客戶端即可。
2.RPC服務端的具體實現
(1).RpcRequest
在具體實現RPC服務端之前,先定義RpcRequest類。
RpcRequest表示了一個RPC呼叫請求。id用於區分多次不同的呼叫,methodName為請求呼叫的方法名,args為引數。package com.maigo.rpc.context; public class RpcRequest { int id; String methodName; Object[] args; public RpcRequest(int id, String methodName, Object[] args) { this.id = id; this.methodName = methodName; this.args = args; } public int getId() { return id; } public String getMethodName() { return methodName; } public Object[] getArgs() { return args; } }
(2).RpcServerBuilder
RpcServerBuilder是建立RpcServer的工廠類
API都很簡單,create()建立工場,serviceInterface()設定服務介面,serviceProvider()設定供方法呼叫的實際物件,bind()設定繫結的埠號,threads()設定Handler執行緒的個數(預設為CPU核數),build()創建出RpcServer物件。package com.maigo.rpc.server; import com.maigo.rpc.aop.RpcInvokeHook; public class RpcServerBuilder { private Class<?> interfaceClass; private Object serviceProvider; private int port; private int threads; private RpcInvokeHook rpcInvokeHook; public static RpcServerBuilder create() { return new RpcServerBuilder(); } /** * set the interface to provide service * @param interfaceClass */ public RpcServerBuilder serviceInterface(Class<?> interfaceClass) { this.interfaceClass = interfaceClass; return this; } /** * set the real object to provide service */ public RpcServerBuilder serviceProvider(Object serviceProvider) { this.serviceProvider = serviceProvider; return this; } /** * set the port to bind */ public RpcServerBuilder bind(int port) { this.port = port; return this; } /** * set the count of threads to handle request from client. (default availableProcessors) */ public RpcServerBuilder threads(int threadCount) { this.threads = threadCount; return this; } /** * set the hook of the method invoke in server */ public RpcServerBuilder hook(RpcInvokeHook rpcInvokeHook) { this.rpcInvokeHook = rpcInvokeHook; return this; } public RpcServer build() { if(threads <= 0) threads = Runtime.getRuntime().availableProcessors(); RpcServer rpcServer = new RpcServer(interfaceClass, serviceProvider, port, threads, rpcInvokeHook); return rpcServer; } }
(3).RpcServer
RpcServer只提供了start()和stop()方法用於啟動和停止RPC服務。由於啟動和停止要涉及網路部分,現在先用列印輸出代替。start()方法中還模擬了收到RpcRequest的情況,用於當前無網路連線的情況下測試。RpcServer的構造方法中建立了一個RpcServerRequestHandler,專門用於處理RpcRequest。package com.maigo.rpc.server; import java.util.concurrent.atomic.AtomicInteger; import com.maigo.rpc.aop.RpcInvokeHook; import com.maigo.rpc.context.RpcRequest; public class RpcServer { private Class<?> interfaceClass; private Object serviceProvider; private int port; private int threads; private RpcInvokeHook rpcInvokeHook; private RpcServerRequestHandler rpcServerRequestHandler; protected RpcServer(Class<?> interfaceClass, Object serviceProvider, int port, int threads, RpcInvokeHook rpcInvokeHook) { this.interfaceClass = interfaceClass; this.serviceProvider = serviceProvider; this.port = port; this.threads = threads; this.rpcInvokeHook = rpcInvokeHook; rpcServerRequestHandler = new RpcServerRequestHandler(interfaceClass, serviceProvider, threads, rpcInvokeHook); rpcServerRequestHandler.start(); } public void start() { System.out.println("bind port:"+port + " success!"); //simulation for receive RpcRequest AtomicInteger idGenerator = new AtomicInteger(0); for(int i=0; i<10; i++) { rpcServerRequestHandler.addRequest(new RpcRequest(idGenerator.addAndGet(1), "testMethod01", new Object[]{"qwerty"})); } } public void stop() { //TODO add stop codes here System.out.println("server stop success!"); } }
(4).RpcServerRequestHandler
專門用於處理RpcRequest的類
package com.maigo.rpc.server;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;
public class RpcServerRequestHandler
{
private Class<?> interfaceClass;
private Object serviceProvider;
private RpcInvokeHook rpcInvokeHook;
private int threads;
private ExecutorService threadPool;
private BlockingQueue<RpcRequest> requestQueue = new LinkedBlockingQueue<RpcRequest>();
public RpcServerRequestHandler(Class<?> interfaceClass, Object serviceProvider, int threads,
RpcInvokeHook rpcInvokeHook)
{
this.interfaceClass = interfaceClass;
this.serviceProvider = serviceProvider;
this.threads = threads;
this.rpcInvokeHook = rpcInvokeHook;
}
public void start()
{
threadPool = Executors.newFixedThreadPool(threads);
for(int i=0; i<threads; i++)
{
threadPool.execute(new RpcServerRequestHandleRunnable(interfaceClass,
serviceProvider, rpcInvokeHook, requestQueue));
}
}
public void addRequest(RpcRequest rpcRequest)
{
try
{
requestQueue.put(rpcRequest);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
}
在RpcServerRequestHandler的構造方法中,建立了1個大小為threads的執行緒池,並讓其運行了threads個RpcServerRequestHandleRunnable。每個RpcServerRequestHandleRunnable持有相同的服務介面interfaceClass表示服務端提供哪些服務,相同的服務提供物件serviceProvider供實際方法呼叫,相同的請求佇列requestQueue用於取出收到的方法呼叫請求。
(5).RpcServerRequestHandleRunnable
方法呼叫請求的實際執行者
package com.maigo.rpc.server;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.BlockingQueue;
import com.maigo.rpc.aop.RpcInvokeHook;
import com.maigo.rpc.context.RpcRequest;
public class RpcServerRequestHandleRunnable implements Runnable
{
private Class<?> interfaceClass;
private Object serviceProvider;
private RpcInvokeHook rpcInvokeHook;
private BlockingQueue<RpcRequest> requestQueue;
public RpcServerRequestHandleRunnable(Class<?> interfaceClass,
Object serviceProvider, RpcInvokeHook rpcInvokeHook,
BlockingQueue<RpcRequest> requestQueue)
{
this.interfaceClass = interfaceClass;
this.serviceProvider = serviceProvider;
this.rpcInvokeHook = rpcInvokeHook;
this.requestQueue = requestQueue;
}
public void run()
{
while(true)
{
try
{
RpcRequest rpcRequest = requestQueue.take();
String methodName = rpcRequest.getMethodName();
Object[] args = rpcRequest.getArgs();
int parameterCount = args.length;
Method method = null;
if(parameterCount > 0)
{
Class<?>[] parameterTypes = new Class[args.length];
for(int i=0; i<parameterCount; i++)
{
parameterTypes[i] = args[i].getClass();
}
method = interfaceClass.getMethod(methodName, parameterTypes);
}
else
{
method = interfaceClass.getMethod(methodName);
}
if(rpcInvokeHook != null)
rpcInvokeHook.beforeInvoke(methodName, args);
Object result = method.invoke(serviceProvider, args);
System.out.println("Send response id = " + rpcRequest.getId() + " result = " + result
+ " back to client. " + Thread.currentThread());
if(rpcInvokeHook != null)
rpcInvokeHook.afterInvoke(methodName, args);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
catch (NoSuchMethodException e)
{
// TODO return NoSuchMethodException to client
e.printStackTrace();
}
catch (SecurityException e)
{
e.printStackTrace();
}
catch (IllegalAccessException e)
{
// TODO return IllegalAccessException to client
e.printStackTrace();
}
catch (IllegalArgumentException e)
{
// TODO return IllegalArgumentException to client
e.printStackTrace();
}
catch (InvocationTargetException e)
{
// TODO return Exception to client
e.printStackTrace();
}
}
}
}
RpcServerRequestHandleRunnable不斷地從請求佇列requestQueue中取出方法呼叫請求RpcRequest,用serviceProvider呼叫請求的方法並向客戶端返回呼叫結果。由於現在還未加入網路部分,向客戶端返回結果暫時先用列印輸出代替。在方法實際呼叫的前後,鉤子Hook的回撥得到了執行。
3.測試
RpcServer中的start()方法模擬了收到10個呼叫請求的情況
TestInterface testInterface = new TestInterface()
{
public String testMethod01(String string)
{
return string.toUpperCase();
}
};
RpcInvokeHook hook = new RpcInvokeHook()
{
public void beforeInvoke(String methodName, Object[] args)
{
System.out.println("beforeInvoke " + methodName);
}
public void afterInvoke(String methodName, Object[] args)
{
System.out.println("afterInvoke " + methodName);
}
};
RpcServer rpcServer = RpcServerBuilder.create()
.serviceInterface(TestInterface.class)
.serviceProvider(testInterface)
.threads(4)
.hook(hook)
.bind(3721)
.build();
rpcServer.start();
輸出結果為:bind port:3721 success!
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 2 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
Send response id = 4 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
Send response id = 1 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 3 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01
afterInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
Send response id = 5 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
Send response id = 6 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
afterInvoke testMethod01
beforeInvoke testMethod01
beforeInvoke testMethod01
Send response id = 10 result = QWERTY back to client. Thread[pool-1-thread-1,5,main]
afterInvoke testMethod01
Send response id = 9 result = QWERTY back to client. Thread[pool-1-thread-4,5,main]
afterInvoke testMethod01
Send response id = 7 result = QWERTY back to client. Thread[pool-1-thread-2,5,main]
afterInvoke testMethod01
Send response id = 8 result = QWERTY back to client. Thread[pool-1-thread-3,5,main]
afterInvoke testMethod01
可見,共有4個Handler執行緒在工作,並且都正確的呼叫了被請求的方法,設定的Hook也受到了正確的回撥。