從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現
RPC-Client模組負責建立 動態代理物件 供 服務消費者 使用,而動態代理物件的方法執行則是通過RPC呼叫RPC-Server的服務實現。即RPC-Client遮蔽了底層的通訊過程,使得服務消費者可以基於介面透明使用服務提供者的服務。
系列文章:
從零寫分散式RPC框架 系列 1.0 (1)架構設計
從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現
從零寫分散式RPC框架 系列 1.0 (5)整合測試
使用gpg外掛釋出jar包到Maven中央倉庫 完整實踐
文章目錄
一 介紹
1 整體結構
2 模組介紹
整體結構如下:
- RpcClient
RPC-Client模組核心類,也是服務消費者會直接使用到的類。該類負責根據傳入的介面類,生成一個可以執行遠端方法的動態代理物件。 - ZKServiceDiscovery
主要負責 服務發現 ,根據服務名從ZK叢集中獲取相應RPC-Client地址資訊 - RpcClientHandler
Rpc客戶端處理器,將RPC-Server返回的RpcResponse物件存入管理。 - ZKProperties
屬性注入類。 - RpcClientAutoConfiguration 和 spring.factories
封裝成spring-boot-starter所需配置,開啟自動裝配。
3 工作流程
二 pom檔案
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-netty-client-spring-boot-autoconfigure</artifactId>
<parent>
<groupId>com.github.linshenkx</groupId>
<artifactId>rpc-netty-spring-boot-starter</artifactId>
<version>1.0.5.RELEASE</version>
<relativePath>../</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
</dependency>
<dependency>
<groupId>com.github.linshenkx</groupId>
<artifactId>rpc-netty-common</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
</dependency>
</dependencies>
</project>
三 簡單元件:屬性注入類和自動裝配類
屬性裝配類ZKProperties和RPC-Server的基本一致,自動裝配類也是,原理相通。RpcClientAutoConfiguration原始碼如下
可以看到整個過程要管理的類其實比RPC-Server的要少很多。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/11/2
* @Description: TODO
*/
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(RpcClient.class)
public class RpcClientAutoConfiguration {
@ConditionalOnMissingBean
@Bean
public ZKProperties defaultZKProperties(){
return new ZKProperties();
}
@ConditionalOnMissingBean
@Bean
public ZKServiceDiscovery zkServiceDiscovery(){
return new ZKServiceDiscovery();
}
@Bean
public RpcClient rpcClient(){
return new RpcClient();
}
}
四 ZKServiceDiscovery
如下,核心方法為 discover,用於根據服務名從ZK叢集獲取到提供該服務的RPC-Server資訊。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/10/31
* @Description: zookeeper服務註冊中心
*/
@Component
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceDiscovery {
@Autowired
private ZKProperties zkProperties;
/**
* 為客戶端提供地址
* 根據服務名獲取服務地址
* @param serviceName
* @return
*/
public String discover(String serviceName){
ZkClient zkClient = new ZkClient(getAddress(zkProperties.getAddressList()), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
try {
String servicePath=zkProperties.getRegistryPath()+"/"+serviceName;
//找不到對應服務
if(!zkClient.exists(servicePath)){
throw new RuntimeException("can not find any service node on path: "+servicePath);
}
//該服務下無節點可用
List<String> addressList=zkClient.getChildren(servicePath);
if(CollectionUtils.isEmpty(addressList)){
throw new RuntimeException("can not find any address node on path: "+servicePath);
}
//獲取address節點地址
String address=getRandomAddress(addressList);
//獲取address節點的值
return zkClient.readData(servicePath+"/"+address);
}finally {
zkClient.close();
}
}
public String getAddress(List<String> addressList){
if(CollectionUtils.isEmpty(addressList)){
String defaultAddress="localhost:2181";
log.error("addressList is empty,using defaultAddress:"+defaultAddress);
return defaultAddress;
}
//待改進策略
String address= getRandomAddress(addressList);
log.info("using address:"+address);
return address;
}
private String getRandomAddress(List<String> addressList){
return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size()));
}
}
五 RpcClient
整個模組的核心類,用於生成服務的動態代理類並實現遠端方法呼叫。
create 方法
根據傳入介面類返回動態代理類
主要使用了JDK的Proxy.newProxyInstance方法,其最關鍵的步驟就是實現InvocationHandler的invoke方法,這個時候便可以根據傳入資料,構造封裝成RpcRequest物件,然後利用 ZKServiceDiscovery 獲取對於RPC-Server的地址資訊。再呼叫send方法將RpcRequest傳送到對應地址,並獲取RpcResponse,再對RpcResult處理(如區分是否有異常等),返回最終的處理結果。完成invoke方法的實現。
send方法
根據RpcRequest物件和Rpc-Server地址資訊,建立Netty連線,將RpcRequest傳送到Rpc-Server並獲取RpcResponse。
responseMap成員
responseMap的作用是作為一箇中介暫存RpcResponse,因為Netty的RpcClientHandler在獲取到RpcResponse物件還無法直接返回給send方法,所以就將其放入responseMap中,再由send方法根據RpcResponse對應的RequestId去獲取。
需要注意,responseMap是 ConcurrentHashMap 型別的。因為目前的流程是每執行一次Rpc方法即執行一次Rpc呼叫都需要建立、斷開netty連線,並且在這個過程需要根據requestId存放RpcResponse再將其移除。需要保證responseMap的執行緒安全。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/11/1
* @Description: TODO
*/
@Log4j2
@Component
@AutoConfigureAfter(ZKServiceDiscovery.class)
public class RpcClient {
@Autowired
private ZKServiceDiscovery zkServiceDiscovery;
/**
* 存放請求編號與響應物件的對映關係
*/
private ConcurrentMap<String, RpcResponse> responseMap=new ConcurrentHashMap<>();
@SuppressWarnings("unchecked")
public <T> T create(final Class<?> interfaceClass){
//建立動態代理物件
return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
new Class<?>[]{interfaceClass},
new InvocationHandler() {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
//建立RPC請求物件
RpcRequest rpcRequest=new RpcRequest();
rpcRequest.setRequestId(UUID.randomUUID().toString());
rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
rpcRequest.setMethodName(method.getName());
rpcRequest.setParameterTypes(method.getParameterTypes());
rpcRequest.setParameters(args);
//獲取RPC服務地址
String serviceName=interfaceClass.getName();
String serviceAddress=zkServiceDiscovery.discover(serviceName);
log.info("get serviceAddres:"+serviceAddress);
//從RPC服務地址中解析主機名與埠號
String[] stringArray= StringUtils.split(serviceAddress,":");
String host= Objects.requireNonNull(stringArray)[0];
int port=Integer.parseInt(stringArray[1]);
//傳送RPC請求
RpcResponse rpcResponse=send(rpcRequest,host,port);
//獲取響應結果
if(rpcResponse==null){
log.error("send request failure",new IllegalStateException("response is null"));
return null;
}
if(rpcResponse.getException()!=null){
log.error("response has exception",rpcResponse.getException());
return null;
}
return rpcResponse.getResult();
}
}
);
}
private RpcResponse send(RpcRequest rpcRequest,String host,int port){
log.info("send begin: "+host+":"+port);
//客戶端執行緒為1即可
EventLoopGroup group=new NioEventLoopGroup(1);
try {
//建立RPC連線
Bootstrap bootstrap=new Bootstrap();
bootstrap.group(group);
bootstrap.channel(NioSocketChannel.class);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ChannelPipeline pipeline=channel.pipeline();
pipeline.addLast(new RpcEncoder(RpcRequest.class))
.addLast(new RpcDecoder(RpcResponse.class))
.addLast(new RpcClientHandler(responseMap));
}
});
ChannelFuture future=bootstrap.connect(host,port).sync();
log.info("requestId: "+rpcRequest.getRequestId());
//寫入RPC請求物件
Channel channel=future.channel();
channel.writeAndFlush(rpcRequest).sync();
channel.closeFuture().sync();
log.info("send end");
//獲取RPC響應物件
return responseMap.get(rpcRequest.getRequestId());
}catch (Exception e){
log.error("client exception",e);
return null;
}finally {
group.shutdownGracefully();
//移除請求編號和響應物件直接的對映關係
responseMap.remove(rpcRequest.getRequestId());
}
}
}
六 RpcClientHandler
RpcClientHandler 只是用來接收資料,並不需要做什麼處理,這裡的作用就是將 RpcResponse 存入 responseMap 。
/**
* @version V1.0
* @author: lin_shen
* @date: 2018/11/1
* @Description: TODO
*/
@Log4j2
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {
private ConcurrentMap<String,RpcResponse> responseMap;
public RpcClientHandler(ConcurrentMap<String,RpcResponse> responseMap){
this.responseMap=responseMap;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
log.info("read a Response,requestId: "+msg.getRequestId());
responseMap.put(msg.getRequestId(),msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
log.error("client caught exception",cause);
ctx.close();
}
}