實現一個簡單的RPC框架
國慶期間閒來無事,看到掘金上一篇文章《 ofollow,noindex">徒手擼框架--實現 RPC 遠端呼叫 》,覺得寫的很不錯,也推薦大家閱讀一下。於是自己也趁機實現了一個簡單的RPC框架,與他不同的是,我使用了 etcd-io/etcd" target="_blank" rel="nofollow,noindex">etcd 作為註冊中心來實現服務註冊與服務發現的功能。具體的內容請看下文~~。
先附上Github : https://github.com/AlexZFX/easyrpc
整體概述
先看一下一個簡單的RPC的呼叫流程

rpc呼叫流程.png
- Server端的服務註冊
- Client端獲取服務提供者的資訊,儲存在本地,定時更新
- Client收到請求,對提供相應服務的Server發起請求
- Server通過反射呼叫本地的服務類的方法,將結果或出現的異常返回。
那我們在寫一個RPC框架時,應該考慮的問題就應該包括以下幾點
- 服務註冊與發現
- Client端服務的代理
- 請求的傳送與處理
- Server端方法的呼叫與結果返回
這四點內部又有一些細節要處理,下面我會對這幾點進行描述,並給出我自己的實現。
服務註冊與發現
對於客戶端和服務端來說,我們希望我們提供的服務是非侵入式的,也就是對客戶端或者服務端本身的服務程式碼無影響。而這樣最便捷的方式便是通過註解來實現。
於是我定義了兩個註解 @RpcInterface 和 @RpcService
註解定義如下
@RpcInterface
/** * Description : 註解於實現的介面類上,表示該類是用於使用遠端 rpc服務的 class,其中的method都會通過動態代理呼叫到遠端的服務端 */ //註解的宣告週期為始終不會丟棄 @Retention(RetentionPolicy.RUNTIME) //註解的使用地點為 類,介面或enum宣告 @Target(ElementType.TYPE) @Documented public @interface RpcInterface { }
@RpcService
/** * Description : 註解於實現了介面的服務類上,表示該類是用於提供rpc服務的 class,其中的method都會被註冊到etcd中 */ @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented public @interface RpcService { }
註解的作用和解釋寫在了註釋裡,有了這兩個註解之後,我們就需要對註解進行處理, @RpcService 對應將相應服務介面名註冊在etcd上, @RpcInterface 對應查詢註冊中心中的服務名,在本地通過動態代理將本地的方法的結果修改為遠端呼叫得到的結果。
服務的註冊和發現通過一個封裝好的 EtcdRegistry 來實現,這裡的程式碼主要參考了 阿里第四屆中介軟體效能大賽初賽 的服務註冊發現程式碼,只做了一點點小的修改。
完整內容還是看Github。
Register方法和Find方法的內容如下~
//註冊類名,一個類對應一個client @Override public void register(String serviceName, int port) throws Exception { String strKey = MessageFormat.format("/{0}/{1}/{2}:{3}", ROOTPATH, serviceName, getHostIp(), String.valueOf(port)); ByteSequence key = ByteSequence.fromString(strKey); // 目前只需要建立這個key,對應的value暫不使用,先留空 ByteSequence val = ByteSequence.fromString(""); //等待put結束之後繼續執行 kv.put(key, val, PutOption.newBuilder().withLeaseId(leaseId).build()).get(); log.info("Register a new service at :" + strKey); } private String getHostIp() throws UnknownHostException { return Inet4Address.getLocalHost().getHostAddress(); }
@Override public List<EndPoint> find(String serviceName) throws Exception { String strkey = MessageFormat.format("/{0}/{1}", ROOTPATH, serviceName); log.info("start to find service, Name :" + strkey); ByteSequence key = ByteSequence.fromString(strkey); GetResponse response = kv.get(key, GetOption.newBuilder().withPrefix(key).build()).get(); List<EndPoint> list = new ArrayList<>(); response.getKvs().forEach(kv -> { String s = kv.getKey().toStringUtf8(); int index = s.lastIndexOf("/"); String endPointStr = s.substring(index + 1, s.length()); String host = endPointStr.split(":")[0]; int post = Integer.parseInt(endPointStr.split(":")[1]); list.add(new EndPoint(host, post)); }); return list; }
對註解的處理方面,為了方便,我沒有自己寫一整套掃描包獲取註解的方法,而是使用了Java的開源反射框架 Reflections ,簡單的使用如下。
Reflections reflections = new Reflections(packagePath); Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class);
這樣我們就獲取到的標識了相應註解的類。
在微服務中,我們往往通過實現同一個介面來保證客戶端方法和服務端方法的同步,所以我們在註冊過程中也應該註冊的是介面名,這樣保證了服務的可拓展性。
這些具體的操作都被放在了 ServerMain 這個類中,服務端的啟動也是通過這個類的start方法來實現。
@Slf4j public class ServerMain { private static final int DEFAULT_SERVER_PORT = 8890; private IRegistry registry; private int port; private final String packagePath; public ServerMain(String packagePath) { this(packagePath, new EtcdRegistry()); } public ServerMain(String packagePath, IRegistry registry) { this.registry = registry; this.packagePath = packagePath; this.port = System.getProperty("server.port") == null ? DEFAULT_SERVER_PORT : Integer.parseInt(System.getProperty("server.port")); } public void start() { Reflections reflections = new Reflections(packagePath); Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcService.class); classes.forEach(clazz -> { try { Class<?>[] interfaces = clazz.getInterfaces(); String clazzName = clazz.getName(); if (interfaces != null && interfaces.length > 0) { //簡單實現,所以只獲取了第一個interface的name,實際上並不準確,可能有誤。 clazzName = interfaces[0].getName(); } //註冊的是 介面名 和 服務例項 //clazzMap是用來儲存一個例項物件,相當於服務端的單例 ServerHandler.clazzMap.put(clazzName, clazz.newInstance()); registry.register(clazzName, port); } catch (Exception e) { log.error("register service failed : " + e.getLocalizedMessage(), e); } }); ////新開執行緒的話會程式會退出(如果在springboot的建構函式中則另開執行緒啟動,否則會阻塞專案的啟動) //new Thread(() -> { Server server = new Server(port); server.start(); //}).start(); } }
客戶端也通過一個 ClientServer 類實現了客戶端服務的啟動工作,將註解處理,服務查詢和快取等任務進行實現。
find方法返回的結果是一個EndPoint的list,對應提供相應服務的n個節點,每個相同的endpoint對應了我封裝的一個netty的client,具體會在client的部分講明~~
@Slf4j public class ClientServer { private IRegistry registry; //設定一個endpoint使用一個client,netty高效理論上滿足使用 private static ConcurrentHashMap<EndPoint, Client> clientMap = new ConcurrentHashMap<>(); private static ConcurrentHashMap<String, List<EndPoint>> serviceMap = new ConcurrentHashMap<>(); private final String packagePath; private static final Random random = new Random(); public ClientServer(String packagePath) { this.packagePath = packagePath; this.registry = new EtcdRegistry(); } public void start() { Reflections reflections = new Reflections(packagePath); Set<Class<?>> classes = reflections.getTypesAnnotatedWith(RpcInterface.class); EventLoopGroup eventLoopGroup = Epoll.isAvailable() ? new EpollEventLoopGroup(4) : new NioEventLoopGroup(4); //定時任務執行緒池,定時更新服務列表,設定為3分鐘 ScheduledExecutorService executorService = new ScheduledThreadPoolExecutor(2); classes.forEach(clazz -> executorService.scheduleAtFixedRate(() -> { try { //拿到當前仍在註冊中心中的相應服務列表 // TODO 刪除掉對應失效的endpoint Class<?>[] interfaces = clazz.getInterfaces(); String className = clazz.getName(); if (interfaces != null && interfaces.length > 0) { className = interfaces[0].getName(); } List<EndPoint> list = registry.find(className); serviceMap.put(className, list); list.forEach(endPoint -> { if (clientMap.get(endPoint) == null) { //所有的Client共用一個EventLoopGroup Client client = new Client(endPoint.getHost(), endPoint.getPort(), eventLoopGroup); clientMap.put(endPoint, client); } }); } catch (Exception e) { e.printStackTrace(); } }, 0, 3 * 60, TimeUnit.SECONDS)); } public static Client getClient(String serviceName) { List<EndPoint> endPoints = serviceMap.get(serviceName); // 簡單的負載均衡,只使用了隨機選擇 if (endPoints != null) { EndPoint endPoint = endPoints.get(random.nextInt(endPoints.size())); return clientMap.get(endPoint); } return null; } }
以上就完成了服務註冊與發現的工作,同時也提供了非常簡單易用的啟動介面,等會看看example就知道啦。
Client端的服務代理
瀏覽器的請求被髮送到Client端後,雖然表面上是通過本地的方法返回,但實質上其實是遠端方法的呼叫,這主要是動過Java中的動態代理來實現的。
這個專案中的動態代理是通過CGLIB來實現的,關於CGLIB和JDK原生動態代理的區別,我就不細述,網上有很多相關的文章。
先宣告一個 ProxyFactory 類,用於在注入服務物件時生成代理
public class ProxyFactory { public static <T> T create(Class<T> clazz) { Enhancer enhancer = new Enhancer(); enhancer.setSuperclass(clazz); enhancer.setCallback(new ProxyIntercepter()); return (T) enhancer.create(); } }
在呼叫代理類的方法時,實際上會呼叫到 ProxyIntercepter 中的intercepter方法。所以實際上的請求會在intercepter方法裡傳送。
ProxyIntercepter主要內容如下
@Slf4j public class ProxyIntercepter implements MethodInterceptor { @Override public Object intercept(Object o, Method method, Object[] parameters, MethodProxy methodProxy) throws Throwable { RpcRequest rpcRequest = new RpcRequest(); Class clazz = method.getDeclaringClass(); Class<?>[] interfaces = clazz.getInterfaces(); //存在介面時使用的是介面名稱 String clazzName = clazz.getName(); if (interfaces != null && interfaces.length > 0) { clazzName = interfaces[0].getName(); } rpcRequest.setClassName(clazzName); rpcRequest.setServiceName(method.getName()); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setParameters(parameters); Client client = ClientServer.getClient(rpcRequest.getClassName()); RpcFuture rpcFuture; if (client != null) { ChannelFuture channelFuture = client.connectChannel(); rpcFuture = new RpcFuture(channelFuture.channel().eventLoop()); if (channelFuture.isSuccess()) { sendRequest(rpcRequest, rpcFuture, channelFuture); } else { channelFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { sendRequest(rpcRequest, rpcFuture, future); } else { log.error("send request error ", future.cause()); } }); } //這裡沒有用listener & getNow的方式獲取主要是考慮客戶端本身非非同步的情形,同時是為了簡便實現。 RpcResponse rpcResponse = rpcFuture.get(5, TimeUnit.SECONDS); if (rpcResponse.getException() == null) { return rpcResponse.getResult(); } else { throw rpcResponse.getException(); } } else { log.error("no rpcService is available :" + rpcRequest.getClassName()); return null; } } private void sendRequest(RpcRequest rpcRequest, RpcFuture rpcFuture, ChannelFuture channelFuture) { channelFuture.channel().writeAndFlush(rpcRequest) .addListener((ChannelFutureListener) writefuture -> { if (writefuture.isSuccess()) { FutureHolder.registerFuture(rpcRequest.getRequestId(), rpcFuture); log.info("send request success"); } else { rpcFuture.tryFailure(writefuture.cause()); log.error("send request failed", writefuture.cause()); } }); } }
可以看到我仍然是獲取了方法所在類的介面,並根據介面名查找了相應的Client物件,傳送請求。
請求的傳送與處理
為了服務端能成功利用反射進行方法呼叫,客戶端的請求應該包含一些引數,在 RpcRequest 類中。
@Data public class RpcRequest { private static AtomicLong atomicLong = new AtomicLong(0); // 請求Idnetty的請求是非同步的,為了複用連線,一般會帶個id,這樣在收到返回資訊的時候能一一對應。 private long requestId; // 類名 private String className; // 服務名 private String serviceName; // 引數型別 private Class<?>[] parameterTypes; // 引數 private Object[] parameters; public RpcRequest() { this.requestId = atomicLong.getAndIncrement(); } public RpcRequest(long requestId) { this.requestId = requestId; } }
服務端返回的資料包含的內容如下
@Data public class RpcResponse { private long requestId; private Throwable exception; private Object result; public RpcResponse(long requestId) { this.requestId = requestId; } public RpcResponse() { } }
Client端和Server端均使用了Netty作為網路框架。
其實利用HTTP協議+Json也可以完成我們的基本需求,但是在大部分Rpc框架中,為了提高效能,往往都會自定義一個滿足需求的協議,並採用一些更為高效的序列化方案,如ProtoBuf,Tyro等。 本專案採用了ProtoStuff作為序列化方案, 和ProtoBuf相比主要是省去了初始生成proto檔案等步驟,提供了較為易用的介面。(網上說ProtoStuff在序列化一些集合類的時候會有bug,我自己測試了一下HashMap和ArrayList,都沒有出現問題,就沒有專門去解決這一問題)
編寫了一個ProtoStuffUtil的工具類,提供serializer和deserializer方法(網上有好多這種程式碼,參考了一些,因為很多文章,就沒有標註來源了)。
public class ProtoStuffUtil { public static <T> byte[] serializer(T o) { Schema schema = RuntimeSchema.getSchema(o.getClass()); return ProtostuffIOUtil.toByteArray(o, schema, LinkedBuffer.allocate()); } public static <T> T deserializer(byte[] bytes, Class<T> clazz) { Schema<T> schema = RuntimeSchema.createFrom(clazz); T message = schema.newMessage(); ProtostuffIOUtil.mergeFrom(bytes, message, schema); return message; } }
工具類提供的方法在Netty的Encoder和Decoder中使用(參考了這篇文章 《使用netty結合Protostuff傳輸物件例子》 ),這兩個類會在Netty啟動時被我利用一個Initializer新增到Netty的pipeline中去,關於Netty的責任鏈模式可以檢視其他相關文章。
RpcEncoder
public class RpcEncoder extends MessageToByteEncoder { private Class<?> targetClazz; public RpcEncoder(Class<?> targetClazz) { this.targetClazz = targetClazz; } @Override protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { if (targetClazz.isInstance(msg)) { byte[] data = ProtoStuffUtil.serializer(msg); out.writeInt(data.length); out.writeBytes(data); } } }
RpcDecoder
public class RpcDecoder extends ByteToMessageDecoder { private Class<?> targerClass; public RpcDecoder(Class<?> targerClass) { this.targerClass = targerClass; } @Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLen = in.readInt(); if (dataLen < 0) { ctx.close(); } if (in.readableBytes() < dataLen) { in.resetReaderIndex(); return; } byte[] data = new byte[dataLen]; in.readBytes(data); Object obj = ProtoStuffUtil.deserializer(data, targerClass); out.add(obj); } }
接下來來說用於傳送請求的Client類,為了複用TCP連線,Netty的channel都添加了KEEPALIVE的引數,同時,我也在每次獲取了一個EndPoint的屬性後建立一個新的Client,儲存在了ClientMap中。每個Client是對應一個EndPoint的,如果併發量大的話,其實可以多個Client對應於一個EndPoint,這時就可以將我們的Client設計為一個連線池,也可以是Netty自身提供的連線池,這方面擴充套件我就不細講,我github上的iustu-agent裡面Connection那一塊本來有用到連線池,後來發現效能並沒有什麼提升就沒再用了。
Client本質上就是一個Netty的Client,我設定成同一個客戶端的所有Client都會共用一個EventLoopGroup,主要是為了資源的合理利用吧。因為負載不高的情況下同一個EventLoopGroup其實是夠用的,並且還會有浪費。
Client的構成如下
@Slf4j public class Client { private EventLoopGroup eventLoopGroup; private Channel channel; private ChannelFuture channelFuture; private String host; private int port; public Client(String host, int port) { this(host, port, Epoll.isAvailable() ? new EpollEventLoopGroup(1) : new NioEventLoopGroup(1)); } public Client(String host, int port, EventLoopGroup eventLoopGroup) { this.host = host; this.port = port; this.eventLoopGroup = eventLoopGroup; } public ChannelFuture connectChannel() { if (channelFuture == null) { channelFuture = new Bootstrap().group(eventLoopGroup) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.ALLOCATOR, ByteBufAllocator.DEFAULT) .channel(Epoll.isAvailable() ? EpollSocketChannel.class : NioSocketChannel.class) .handler(new ClientInitialzer()) .connect(host, port) .addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { channel = future.channel(); log.info("start a client to " + host + ":" + port); channel.closeFuture().addListener((ChannelFutureListener) closefuture -> { log.info("stop the client to " + host + ":" + port); }); } else { log.error("start a Client failed", future.cause()); } }) ; } return channelFuture; } public Channel getChannel() { if (channel != null) { return channel; } else { channelFuture = connectChannel(); return channelFuture.channel(); } } }
可以看見只有第一次呼叫 connectChannel 方法的時候才會真正的向服務端發起連線,這會返回一個channelFuture,但是channelFuture的結果並不一定已經成功了。
所以在 ProxyIntecepter 裡我也做了相應的處理,完整程式碼上面已附。處理部分的程式碼如下
ChannelFuture channelFuture = client.connectChannel(); rpcFuture = new RpcFuture(channelFuture.channel().eventLoop()); if (channelFuture.isSuccess()) { sendRequest(rpcRequest, rpcFuture, channelFuture); } else { channelFuture.addListener((ChannelFutureListener) future -> { if (future.isSuccess()) { sendRequest(rpcRequest, rpcFuture, future); } else { log.error("send request error ", future.cause()); } }); }
請求傳送的時候本地會在 RpcHolder (內部使用了ThreadLocal的HashMap) 中儲存一個 RpcFuture(實質上只是繼承了Netty中的DefaultPromise),與唯一的 RequestId 相對應,rpcfuture.get()會先wait,並在請求得到返回之後被notify並返回結果,如果結果中包含Exception,則會丟擲異常。
服務端呼叫與返回
相比於Client而言,Server端的實現就簡單了許多,只是開啟了一個ServerBootStarp並繫結在指定的埠上接受請求。
收到請求後,會通過請求中攜帶的服務名查詢到相應的物件(在註冊服務的同時被新增到clazzMap中去的impl例項),並利用反射呼叫其中方法,對結果進行返回。這裡用的也是CGLIB中提供的FastClass來實現反射。
主要的程式碼都在 ServerHandler 中了
@Slf4j @ChannelHandler.Sharable public class ServerHandler extends SimpleChannelInboundHandler<RpcRequest> { public static ConcurrentHashMap<String, Object> clazzMap = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, RpcRequest msg) throws Exception { log.info("recieve a request id : " + msg.getRequestId()); RpcResponse rpcResponse = getResponse(msg); ctx.writeAndFlush(rpcResponse).addListener((GenericFutureListener<ChannelFuture>) future -> { if (!future.isSuccess()) { log.error(future.cause().getLocalizedMessage()); } }); } private RpcResponse getResponse(RpcRequest rpcRequest) { RpcResponse rpcResponse = new RpcResponse(rpcRequest.getRequestId()); try { Class<?> clazz = Class.forName(rpcRequest.getClassName()); Object c = clazzMap.get(rpcRequest.getClassName()); if (c == null) { clazzMap.put(rpcRequest.getClassName(), clazz.newInstance()); c = clazzMap.get(rpcRequest.getClassName()); } String methodName = rpcRequest.getServiceName(); Class<?>[] parameterTypes = rpcRequest.getParameterTypes(); Object[] parameters = rpcRequest.getParameters(); FastClass fastClass = FastClass.create(clazz); FastMethod fastMethod = fastClass.getMethod(methodName, parameterTypes); //與Spring聯合使用時應該呼叫ApplicationContext裡面的已有的bean Object result = fastMethod.invoke(c, parameters); rpcResponse.setResult(result); } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | InvocationTargetException e) { e.printStackTrace(); rpcResponse.setException(e); } return rpcResponse; } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { if (cause instanceof IOException) { if (cause.getLocalizedMessage().equals("遠端主機強迫關閉了一個現有的連線。")) { log.info("一個客戶端連線斷開。"); return; } } super.exceptionCaught(ctx, cause); } }
試著使用一下~
先將當前的專案安裝到本地倉庫中去以供其他專案使用~
mvn install
這樣就OK了 ,最好還是用idea做這些比較方便
然後新建一個SpringBoot專案,加入web的元件,新增三個模組,組成如下圖

test組成圖
同時在pom.xml中加入easyrpc的引用
<dependency> <groupId>com.alexzfx</groupId> <artifactId>easyrpc</artifactId> <version>1.0-SNAPSHOT</version> </dependency>
在common包中,我們在 com.alexzfx.easyrpc.com
目錄下建立 HelloService 介面,宣告 sayHello方法
package com.alexzfx.easyrpc.common; public interface HelloService { String sayHello(); }
然後在server包中建立一個實現類,標識 @RpcService 註解
package com.alexzfx.easyrpc.server; import com.alexzfx.easyrpc.commom.annotation.RpcService; import com.alexzfx.easyrpc.common.HelloService; @RpcService public class HelloServiceImpl implements HelloService { @Override public String sayHello() { return "Hello EasyRPC"; } }
在client包中也建立一個實現類,標識 @RpcInterface 註解,但只是實現,不做其他方法內容的實現
package com.alexzfx.easyrpc.client; import com.alexzfx.easyrpc.commom.annotation.RpcInterface; import com.alexzfx.easyrpc.common.HelloService; @RpcInterface public class HelloServiceImpl implements HelloService { @Override public String sayHello() { return null; } }
在server包中建立一個main方法,實現我們的服務啟動功能
package com.alexzfx.easyrpc.server; public class ServerApplication { public static void main(String[] args) { ServerMain serverMain = new ServerMain("com.alexzfx.easyrpc.server"); serverMain.start(); } }
在client端中實現一個controller,並返回 sayHello 方法的結果,啟動前要先完成我們client端的啟動工作,並將本地使用的服務類建立為我們的代理類(因為本專案沒有完全對應Spring做配置,如果是專門為了和Spring整合的話,可以直接在applicationContext載入的時候將代理類新增進去,這樣就可以使 @Autowire 註解注入的是我們的代理類了)。
import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @RestController public class ClientApplication { public ClientApplication() { ClientServer clientServer = new ClientServer("com.alexzfx.easyrpc.client"); clientServer.start(); helloService = ProxyFactory.create(HelloServiceImpl.class); } private HelloService helloService; public static void main(String[] args) { SpringApplication.run(ClientApplication.class); } @RequestMapping("/") public String sayhello() { return helloService.sayHello(); } }
這樣,我們整體的測試專案就構建完成了。
啟動 etcd ,Windows上只要下載連結中相應的版本,解壓縮後直接雙擊啟動 etcd.exe 即可,其他系統也類似。
啟動 server 端的 main 函式,啟動時註冊了服務,所在埠等日誌都會被打印出來。
再啟動client端 SpringBoot的 main 函式。
訪問 http://localhost:8080 你就可以看到 Hello EasyRPC
的出現啦。
總結
通過以上的內容,實現了一個簡單易用,包括了服務註冊與發現功能的RPC框架。實際上一個可通用的RPC框架,要處理的事情遠遠不止上面做的這麼簡單。
這是我第一次寫完整的技術類部落格,有許多不足的地方,希望發現的老哥幫忙指正~
如果你覺得還不錯的話,希望能給我的這個專案點個小小的 star ,這就是對我最大的鼓勵啦。 EasyRPC