使用Netty實現遠程方法調用(RPC)
很多情況下,我們可能需要用到調用遠程方法的時候。比如,我們有統一的布隆過濾器,其它服務需要調用布隆過濾器進行判重;比如,我們需要調用統一的緩存數據;比如我們需要跨機器調用一些服務方法等等。這些時候都可以使用遠程方法調用。
接下來,開始講解使用Netty實現遠程方法調用的步驟。
代碼目錄結構為:
nettynetty/client//RPC消息回調類netty/client/MessageCallBack.java//Rpc客戶端管道初始化netty/client/MessageSendChannelInitializer.java//RPC客戶端消息發送執行(動態代理)類netty/client/MessageSendExecutor.java//RPC客戶端消息發送處理類netty/client/MessageSendHandler.java//Rpc客戶端線程任務處理netty/client/MessageSendInitializeTask.java//Rpc客戶端代理netty/client/MessageSendProxy.java//RPC客戶端消息序列化協議框架netty/client/RpcSendSerializeFrame.java//rpc客戶端服務器配置加載netty/client/RpcServerLoader.javanetty/interfnetty/interf.impl//接口實現類netty/interf/impl/CalculateImpl.java//接口netty/interf/Calculate.javanetty/serializenetty/serialize.kryo//Kryo RPC消息進行編碼、解碼類netty/serialize/kryo/KryoCodecUtil.java//Kryo ×××netty/serialize/kryo/KryoDecoder.java//Kryo 編碼器netty/serialize/kryo/KryoEncoder.java//Kryo 工廠類netty/serialize/kryo/KryoPoolFactory.java//Kryo RPC序列化類netty/serialize/kryo/KryoSerialize.java//RPC消息進行編碼、解碼接口netty/serialize/MessageCodecUtil.java//消息×××netty/serialize/MessageDecoder.java//消息編碼器netty/serialize/MessageEncoder.java//RPC消息序列化/反序列化接口定義netty/serialize/RpcSerialize.java//RPC消息序序列化協議選擇器接口netty/serialize/RpcSerializeFrame.java//RPC消息序序列化協議類型netty/serialize/RpcSerializeProtocol.javanetty/server//服務端: 線程池異常策略netty/server/AbortPolicyWithReport.java//Rpc服務器執行模塊netty/server/MessageRecvChannelInitializer.java//服務器執行模塊netty/server/MessageRecvExecutor.java//Rpc服務器消息處理netty/server/MessageRecvHandler.java//Rpc服務器消息線程任務處理netty/server/MessageRecvInitializeTask.java//線程工廠:實際上就是對Runable進行一個包裝,對線程設置一些信息和監控信息netty/server/NamedThreadFactory.java//RPC服務端消息序列化協議框架netty/server/RpcRecvSerializeFrame.java//自定義的線程池netty/server/RpcThreadPool.java//消息的請求體netty/MessageRequest.java//響應的請求體netty/MessageResponse.java//客戶端netty/NettyClient.java//服務端netty/NettyServer.java
啟動NettyServer服務端:
public static void main(String[] args) { MessageRecvExecutor executor = new MessageRecvExecutor("127.0.0.1:8686", RpcSerializeProtocol.KRYOSERIALIZE.name()); try { executor.startServer(); } catch (Exception e) { e.printStackTrace(); } }
控制臺打印:
RPC Server start success!ip:127.0.0.1port:8686protocol:RpcSerializeProtocol[serializeProtocol=kryo,name=KRYOSERIALIZE,ordinal=1]
啟動NettyClient客戶端,並遠程調用Calculate接口中的add方法:
final static MessageSendExecutor executor = new MessageSendExecutor("127.0.0.1:8686",RpcSerializeProtocol.KRYOSERIALIZE
.name());
final static Calculate cal = executor.execute(Calculate.class);
public static void main(String[] args) throws Exception {
for(int i = 1;i< 100000;i++)
ThreadExecuteUtil.submitTaskBlock(ThreadExecuteUtil.CLIENT, new Runnable() {
@Override
public void run() {
long start = System.currentTimeMillis();
int result = cal.add(10, 20);
System.out.println("result:"+result);
long end = System.currentTimeMillis();
System.out.println(end-start);
}
}, 1000);
}
使用Netty實現遠程方法調用(RPC)