1. 程式人生 > >從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現

從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現

RPC-Client模組負責建立 動態代理物件 供 服務消費者 使用,而動態代理物件的方法執行則是通過RPC呼叫RPC-Server的服務實現。即RPC-Client遮蔽了底層的通訊過程,使得服務消費者可以基於介面透明使用服務提供者的服務。

系列文章:

從零寫分散式RPC框架 系列 1.0 (1)架構設計
從零寫分散式RPC框架 系列 1.0 (2)RPC-Common模組設計實現
從零寫分散式RPC框架 系列 1.0 (3)RPC-Server模組設計實現
從零寫分散式RPC框架 系列 1.0 (4)RPC-Client模組設計實現
從零寫分散式RPC框架 系列 1.0 (5)整合測試


使用gpg外掛釋出jar包到Maven中央倉庫 完整實踐

文章目錄

一 介紹

1 整體結構

整體結構

2 模組介紹

整體結構如下:

  1. RpcClient
    RPC-Client模組核心類,也是服務消費者會直接使用到的類。該類負責根據傳入的介面類,生成一個可以執行遠端方法的動態代理物件。
  2. ZKServiceDiscovery
    主要負責 服務發現 ,根據服務名從ZK叢集中獲取相應RPC-Client地址資訊
  3. RpcClientHandler
    Rpc客戶端處理器,將RPC-Server返回的RpcResponse物件存入管理。
  4. ZKProperties
    屬性注入類。
  5. RpcClientAutoConfiguration 和 spring.factories
    封裝成spring-boot-starter所需配置,開啟自動裝配。

3 工作流程

二 pom檔案

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-netty-client-spring-boot-autoconfigure</artifactId>

    <parent>
        <groupId>com.github.linshenkx</groupId>
        <artifactId>rpc-netty-spring-boot-starter</artifactId>
        <version>1.0.5.RELEASE</version>
        <relativePath>../</relativePath>
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
        </dependency>
        <dependency>
            <groupId>com.github.linshenkx</groupId>
            <artifactId>rpc-netty-common</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
        </dependency>
        <dependency>
            <groupId>com.101tec</groupId>
            <artifactId>zkclient</artifactId>
        </dependency>
    </dependencies>

</project>

三 簡單元件:屬性注入類和自動裝配類

屬性裝配類ZKProperties和RPC-Server的基本一致,自動裝配類也是,原理相通。RpcClientAutoConfiguration原始碼如下

可以看到整個過程要管理的類其實比RPC-Server的要少很多。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/11/2
 * @Description: TODO
 */
@Configuration
@EnableConfigurationProperties
@ConditionalOnClass(RpcClient.class)
public class RpcClientAutoConfiguration {
    @ConditionalOnMissingBean
    @Bean
    public ZKProperties defaultZKProperties(){
        return new ZKProperties();
    }

    @ConditionalOnMissingBean
    @Bean
    public ZKServiceDiscovery zkServiceDiscovery(){
        return new ZKServiceDiscovery();
    }

    @Bean
    public RpcClient rpcClient(){
        return new RpcClient();
    }
}

四 ZKServiceDiscovery

如下,核心方法為 discover,用於根據服務名從ZK叢集獲取到提供該服務的RPC-Server資訊。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/10/31
 * @Description: zookeeper服務註冊中心
 */
@Component
@Log4j2
@EnableConfigurationProperties(ZKProperties.class)
public class ZKServiceDiscovery {

  @Autowired
  private ZKProperties zkProperties;

  /**
   * 為客戶端提供地址
   * 根據服務名獲取服務地址
   * @param serviceName
   * @return
   */
  public String discover(String serviceName){
    ZkClient zkClient = new ZkClient(getAddress(zkProperties.getAddressList()), zkProperties.getSessionTimeOut(), zkProperties.getConnectTimeOut());
    try {
      String servicePath=zkProperties.getRegistryPath()+"/"+serviceName;
      //找不到對應服務
      if(!zkClient.exists(servicePath)){
        throw new RuntimeException("can not find any service node on path: "+servicePath);
      }
      //該服務下無節點可用
      List<String> addressList=zkClient.getChildren(servicePath);
      if(CollectionUtils.isEmpty(addressList)){
        throw new RuntimeException("can not find any address node on path: "+servicePath);
      }
      //獲取address節點地址
      String address=getRandomAddress(addressList);
      //獲取address節點的值
      return zkClient.readData(servicePath+"/"+address);
    }finally {
      zkClient.close();
    }
  }
  
  public String getAddress(List<String> addressList){
    if(CollectionUtils.isEmpty(addressList)){
      String defaultAddress="localhost:2181";
      log.error("addressList is empty,using defaultAddress:"+defaultAddress);
      return defaultAddress;
    }
    //待改進策略
    String address= getRandomAddress(addressList);
    log.info("using address:"+address);
    return address;
  }

  private String getRandomAddress(List<String> addressList){
    return addressList.get(ThreadLocalRandom.current().nextInt(addressList.size()));
  }
  
}

五 RpcClient

整個模組的核心類,用於生成服務的動態代理類並實現遠端方法呼叫。

create 方法

根據傳入介面類返回動態代理類
主要使用了JDK的Proxy.newProxyInstance方法,其最關鍵的步驟就是實現InvocationHandler的invoke方法,這個時候便可以根據傳入資料,構造封裝成RpcRequest物件,然後利用 ZKServiceDiscovery 獲取對於RPC-Server的地址資訊。再呼叫send方法將RpcRequest傳送到對應地址,並獲取RpcResponse,再對RpcResult處理(如區分是否有異常等),返回最終的處理結果。完成invoke方法的實現。

send方法

根據RpcRequest物件和Rpc-Server地址資訊,建立Netty連線,將RpcRequest傳送到Rpc-Server並獲取RpcResponse。

responseMap成員

responseMap的作用是作為一箇中介暫存RpcResponse,因為Netty的RpcClientHandler在獲取到RpcResponse物件還無法直接返回給send方法,所以就將其放入responseMap中,再由send方法根據RpcResponse對應的RequestId去獲取。

需要注意,responseMap是 ConcurrentHashMap 型別的。因為目前的流程是每執行一次Rpc方法即執行一次Rpc呼叫都需要建立、斷開netty連線,並且在這個過程需要根據requestId存放RpcResponse再將其移除。需要保證responseMap的執行緒安全。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/11/1
 * @Description: TODO
 */
@Log4j2
@Component
@AutoConfigureAfter(ZKServiceDiscovery.class)
public class RpcClient {
    @Autowired
    private ZKServiceDiscovery zkServiceDiscovery;
    /**
     * 存放請求編號與響應物件的對映關係
     */
    private ConcurrentMap<String, RpcResponse> responseMap=new ConcurrentHashMap<>();

    @SuppressWarnings("unchecked")
    public <T> T create(final Class<?> interfaceClass){
        //建立動態代理物件
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(),
                new Class<?>[]{interfaceClass},
                new InvocationHandler() {
                    @Override
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        //建立RPC請求物件
                        RpcRequest rpcRequest=new RpcRequest();
                        rpcRequest.setRequestId(UUID.randomUUID().toString());
                        rpcRequest.setInterfaceName(method.getDeclaringClass().getName());
                        rpcRequest.setMethodName(method.getName());
                        rpcRequest.setParameterTypes(method.getParameterTypes());
                        rpcRequest.setParameters(args);
                        //獲取RPC服務地址
                        String serviceName=interfaceClass.getName();
                        String serviceAddress=zkServiceDiscovery.discover(serviceName);
                        log.info("get serviceAddres:"+serviceAddress);
                        //從RPC服務地址中解析主機名與埠號
                        String[] stringArray= StringUtils.split(serviceAddress,":");
                        String host= Objects.requireNonNull(stringArray)[0];
                        int port=Integer.parseInt(stringArray[1]);
                        //傳送RPC請求
                        RpcResponse rpcResponse=send(rpcRequest,host,port);
                        //獲取響應結果
                        if(rpcResponse==null){
                            log.error("send request failure",new IllegalStateException("response is null"));
                            return null;
                        }
                        if(rpcResponse.getException()!=null){
                            log.error("response has exception",rpcResponse.getException());
                            return null;
                        }
                        return rpcResponse.getResult();
                    }
                }
        );
    }


    private RpcResponse send(RpcRequest rpcRequest,String host,int port){
        log.info("send begin: "+host+":"+port);
        //客戶端執行緒為1即可
        EventLoopGroup group=new NioEventLoopGroup(1);
        try {
            //建立RPC連線
            Bootstrap bootstrap=new Bootstrap();
            bootstrap.group(group);
            bootstrap.channel(NioSocketChannel.class);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() {
                @Override
                protected void initChannel(SocketChannel channel) throws Exception {
                    ChannelPipeline pipeline=channel.pipeline();
                    pipeline.addLast(new RpcEncoder(RpcRequest.class))
                            .addLast(new RpcDecoder(RpcResponse.class))
                            .addLast(new RpcClientHandler(responseMap));
                }
            });
            ChannelFuture future=bootstrap.connect(host,port).sync();
            log.info("requestId: "+rpcRequest.getRequestId());
            //寫入RPC請求物件
            Channel channel=future.channel();
            channel.writeAndFlush(rpcRequest).sync();
            channel.closeFuture().sync();
            log.info("send end");
            //獲取RPC響應物件
            return responseMap.get(rpcRequest.getRequestId());
        }catch (Exception e){
            log.error("client exception",e);
            return null;
        }finally {
            group.shutdownGracefully();
            //移除請求編號和響應物件直接的對映關係
            responseMap.remove(rpcRequest.getRequestId());
        }

    }

}

六 RpcClientHandler

RpcClientHandler 只是用來接收資料,並不需要做什麼處理,這裡的作用就是將 RpcResponse 存入 responseMap 。

/**
 * @version V1.0
 * @author: lin_shen
 * @date: 2018/11/1
 * @Description: TODO
 */
@Log4j2
public class RpcClientHandler extends SimpleChannelInboundHandler<RpcResponse> {

    private ConcurrentMap<String,RpcResponse> responseMap;

    public RpcClientHandler(ConcurrentMap<String,RpcResponse> responseMap){
        this.responseMap=responseMap;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RpcResponse msg) throws Exception {
        log.info("read a Response,requestId: "+msg.getRequestId());
        responseMap.put(msg.getRequestId(),msg);
    }
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        log.error("client caught exception",cause);
        ctx.close();
    }

}