1. 程式人生 > >基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇

基於Netty和SpringBoot實現一個輕量級RPC框架-Server篇

前提

前置文章:

  • Github Page:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》
  • Coding Page:《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》

在前置的《基於Netty和SpringBoot實現一個輕量級RPC框架-協議篇》一文中已經定義了一個相對簡單的RPC私有協議,並且實現了對應的編碼和解碼模組。這篇文章基於協議篇,完成Server端程式碼呼叫的編寫。考慮到目前相對主流的IOC容器是Spring,這裡選用了spring-boot-starter(非MVC容器,只是單純管理Bean),依賴JDK1.8+

思路

首先RPC

私有協議定義了Client端會傳過來四個和服務呼叫息息相關的字元:介面全類名interfaceName、方法名methodName、方法引數簽名字串陣列methodArgumentSignatures(可選,這個引數不是必須傳入的)以及方法引數陣列methodArguments(可選,空方法列表的時候不需要傳入引數)。主要流程如下:

  • Server端的所有服務端(實現)類交由IOC容器託管。
  • Client端發起RPC請求。
  • 通過前面提到的最多四個引數,從Server服務例項的IOC容器中匹配出吻合度最高的一個方法java.lang.reflect.Method例項、該方法例項的宿主類以及宿主類對應的Bean
    例項,如果這一步匹配的目標方法超過1個或者為0個,可以直接返回異常資訊。
  • 把前一步得到的Method例項、宿主類Bean例項,結合方法引數陣列methodArguments進行反射呼叫,得到呼叫結果。
  • Server端把響應結果封裝到payload通過私有協議傳送回Client端。

Server端程式碼實現

為了暫時方便起見,部分陣列入參被重新封裝為ArrayList,實際上編寫RPC框架的時候應該優先考慮效能問題,像JDK提供的集合類庫等等應該儘可能少用(以ArrayList為例,擴容的時候存在底層Object[]拷貝,造成效能損失和額外的記憶體消耗),極盡可能使用基本型別和陣列。

先定義方法匹配器MethodMatcher

相關的類:

public interface MethodMatcher {

    /**
     * 查詢一個匹配度最高的方法資訊
     *
     * @param input input
     * @return output
     */
    MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input);
}

// 輸入值
@EqualsAndHashCode
@Data
public class MethodMatchInput {

    private String interfaceName;

    private String methodName;

    private List<String> methodArgumentSignatures;

    private int methodArgumentArraySize;
}

// 輸出值
@Data
public class MethodMatchOutput {

    /**
     * 目標方法例項
     */
    private Method targetMethod;

    /**
     * 目標實現類 - 這個有可能是被Cglib增強過的型別,是宿主類的子類,如果沒有被Cglib增強過,那麼它就是宿主類
     */
    private Class<?> targetClass;

    /**
     * 宿主類
     */
    private Class<?> targetUserClass;

    /**
     * 宿主類Bean例項
     */
    private Object target;

    /**
     * 方法引數型別列表
     */
    private List<Class<?>> parameterTypes;
}

目標方法匹配的邏輯大致如下:

  1. 方法名稱和方法例項的宿主型別一定作為匹配條件的一部分。
  2. 如果傳入了引數簽名列表,優先使用引數簽名列表型別進行匹配。
  3. 如果沒有傳入引數簽名列表,那麼使用引數的數量進行匹配。
  4. 如果引數簽名列表和引數列表都沒有傳入,那麼只能通過方法名稱和方法例項的宿主型別匹配。
  5. 考慮到方法匹配解析的過程相對耗時,需要把結果快取起來。

分析至此,可以基於反射,編寫一個抽象的方法匹配器BaseMethodMatcher,然後把獲取宿主類資訊的功能委託到子類:

public class MethodMatchException extends RuntimeException {

    public MethodMatchException(String message) {
        super(message);
    }

    public MethodMatchException(String message, Throwable cause) {
        super(message, cause);
    }

    public MethodMatchException(Throwable cause) {
        super(cause);
    }
}

@Data
public class HostClassMethodInfo {

    private Class<?> hostClass;
    private Class<?> hostUserClass;
    private Object hostTarget;
}

@Slf4j
abstract class BaseMethodMatcher implements MethodMatcher {

    private final ConcurrentMap<MethodMatchInput, MethodMatchOutput> cache = Maps.newConcurrentMap();

    @Override
    public MethodMatchOutput selectOneBestMatchMethod(MethodMatchInput input) {
        return cache.computeIfAbsent(input, in -> {
            try {
                MethodMatchOutput output = new MethodMatchOutput();
                Class<?> interfaceClass = Class.forName(in.getInterfaceName());
                // 獲取宿主類資訊
                HostClassMethodInfo info = findHostClassMethodInfo(interfaceClass);
                List<Method> targetMethods = Lists.newArrayList();
                ReflectionUtils.doWithMethods(info.getHostUserClass(), targetMethods::add, method -> {
                    String methodName = method.getName();
                    Class<?> declaringClass = method.getDeclaringClass();
                    List<Class<?>> inputParameterTypes = Optional.ofNullable(in.getMethodArgumentSignatures())
                            .map(mas -> {
                                List<Class<?>> list = Lists.newArrayList();
                                mas.forEach(ma -> list.add(ClassUtils.resolveClassName(ma, null)));
                                return list;
                            }).orElse(Lists.newArrayList());
                    output.setParameterTypes(inputParameterTypes);
                    // 如果傳入了引數簽名列表,優先使用引數簽名列表型別進行匹配
                    if (!inputParameterTypes.isEmpty()) {
                        List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                        return Objects.equals(methodName, in.getMethodName()) &&
                                Objects.equals(info.getHostUserClass(), declaringClass) &&
                                Objects.equals(parameterTypes, inputParameterTypes);
                    }
                    // 如果沒有傳入引數簽名列表,那麼使用引數的數量進行匹配
                    if (in.getMethodArgumentArraySize() > 0) {
                        List<Class<?>> parameterTypes = Lists.newArrayList(method.getParameterTypes());
                        return Objects.equals(methodName, in.getMethodName()) &&
                                Objects.equals(info.getHostUserClass(), declaringClass) &&
                                in.getMethodArgumentArraySize() == parameterTypes.size();

                    }
                    // 如果引數簽名列表和引數列表都沒有傳入,那麼只能通過方法名稱和方法例項的宿主型別匹配
                    return Objects.equals(methodName, in.getMethodName()) &&
                            Objects.equals(info.getHostUserClass(), declaringClass);

                });
                if (targetMethods.size() != 1) {
                    throw new MethodMatchException(String.format("查詢到目標方法數量不等於1,interface:%s,method:%s",
                            in.getInterfaceName(), in.getMethodName()));
                }
                Method targetMethod = targetMethods.get(0);
                output.setTargetClass(info.getHostClass());
                output.setTargetMethod(targetMethod);
                output.setTargetUserClass(info.getHostUserClass());
                output.setTarget(info.getHostTarget());
                return output;
            } catch (Exception e) {
                log.error("查詢匹配度最高的方法失敗,輸入引數:{}", JSON.toJSONString(in), e);
                if (e instanceof MethodMatchException) {
                    throw (MethodMatchException) e;
                } else {
                    throw new MethodMatchException(e);
                }
            }
        });
    }

    /**
     * 獲取宿主類的資訊
     *
     * @param interfaceClass interfaceClass
     * @return HostClassMethodInfo
     */
    abstract HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass);
}

接著,通過介面型別獲取宿主類的功能就委託給Spring實現,從IOC容器中獲取,定義SpringMethodMatcher

@Component
public class SpringMethodMatcher extends BaseMethodMatcher implements BeanFactoryAware {

    private DefaultListableBeanFactory beanFactory;

    @Override
    public void setBeanFactory(@NonNull BeanFactory beanFactory) throws BeansException {
        this.beanFactory = (DefaultListableBeanFactory) beanFactory;
    }

    @Override
    HostClassMethodInfo findHostClassMethodInfo(Class<?> interfaceClass) {
        HostClassMethodInfo info = new HostClassMethodInfo();
        // 從容器中通過介面型別獲取對應的實現,實現必須只有一個
        Object bean = beanFactory.getBean(interfaceClass);
        info.setHostTarget(bean);
        info.setHostClass(bean.getClass());
        info.setHostUserClass(ClassUtils.getUserClass(bean.getClass()));
        return info;
    }
}

至此,目標方法匹配的模組已經編寫完畢,接下來需要處理方法引數列表的反序列化。編寫協議的時候,筆者把方法引數列表methodArguments存放在Object陣列中,傳輸的時候序列化為byte陣列,經過協議解析之後,方法引數列表的實際型別為ByteBuf陣列(這是因為Netty中的位元組容器就是ByteBuf),那麼需要考慮把ByteBuf陣列轉換為目標方法的引數型別例項。主要步驟如下:

  1. 如果方法引數列表為空,那麼什麼都不用做,也就是呼叫了無引數的方法。
  2. 如果方法引數列表不為空同時方法引數型別列表不為空,優先選用方法引數型別列表進行轉換。
  3. 如果方法引數列表不為空同時方法引數型別列表為空,則使用Method#getParameterTypes()得到的方法引數列表型別進行轉換。

定義一個方法引數轉換器介面MethodArgumentConverter

public interface MethodArgumentConverter {

    ArgumentConvertOutput convert(ArgumentConvertInput input);
}

@Data
public class ArgumentConvertInput {

    /**
     * 目標方法
     */
    private Method method;

    /**
     * 方法引數型別列表
     */
    private List<Class<?>> parameterTypes;

    /**
     * 方法引數列表
     */
    private List<Object> arguments;
}

@Data
public class ArgumentConvertOutput {


    private Object[] arguments;
}

方法引數轉換器的預設實現如下:

@Slf4j
@Component
public class DefaultMethodArgumentConverter implements MethodArgumentConverter {

    private final Serializer serializer = FastJsonSerializer.X;

    @Override
    public ArgumentConvertOutput convert(ArgumentConvertInput input) {
        ArgumentConvertOutput output = new ArgumentConvertOutput();
        try {
            if (null == input.getArguments() || input.getArguments().isEmpty()) {
                output.setArguments(new Object[0]);
                return output;
            }
            List<Class<?>> inputParameterTypes = input.getParameterTypes();
            int size = inputParameterTypes.size();
            if (size > 0) {
                Object[] arguments = new Object[size];
                for (int i = 0; i < size; i++) {
                    ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
                    int readableBytes = byteBuf.readableBytes();
                    byte[] bytes = new byte[readableBytes];
                    byteBuf.readBytes(bytes);
                    arguments[i] = serializer.decode(bytes, inputParameterTypes.get(i));
                    byteBuf.release();
                }
                output.setArguments(arguments);
                return output;
            }
            Class<?>[] parameterTypes = input.getMethod().getParameterTypes();
            int len = parameterTypes.length;
            Object[] arguments = new Object[len];
            for (int i = 0; i < len; i++) {
                ByteBuf byteBuf = (ByteBuf) input.getArguments().get(i);
                int readableBytes = byteBuf.readableBytes();
                byte[] bytes = new byte[readableBytes];
                byteBuf.readBytes(bytes);
                arguments[i] = serializer.decode(bytes, parameterTypes[i]);
                byteBuf.release();
            }
            output.setArguments(arguments);
            return output;
        } catch (Exception e) {
            throw new ArgumentConvertException(e);
        }
    }
}    

所有前置工作都完成了,現在編寫一個Server端的入站處理器ServerHandler,暫時不做程式碼邏輯優化,只做實現,把反射呼叫的模組直接在此類中編寫:

@Component
@Slf4j
public class ServerHandler extends SimpleChannelInboundHandler<RequestMessagePacket> {

    @Autowired
    private MethodMatcher methodMatcher;

    @Autowired
    private MethodArgumentConverter methodArgumentConverter;

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RequestMessagePacket packet) throws Exception {
        log.info("服務端接收到:{}", packet);
        MethodMatchInput input = new MethodMatchInput();
        input.setInterfaceName(packet.getInterfaceName());
        input.setMethodArgumentSignatures(Optional.ofNullable(packet.getMethodArgumentSignatures())
                .map(Lists::newArrayList).orElse(Lists.newArrayList()));
        input.setMethodName(packet.getMethodName());
        Object[] methodArguments = packet.getMethodArguments();
        input.setMethodArgumentArraySize(null != methodArguments ? methodArguments.length : 0);
        MethodMatchOutput output = methodMatcher.selectOneBestMatchMethod(input);
        log.info("查詢目標實現方法成功,目標類:{},宿主類:{},宿主方法:{}",
                output.getTargetClass().getCanonicalName(),
                output.getTargetUserClass().getCanonicalName(),
                output.getTargetMethod().getName()
        );
        Method targetMethod = output.getTargetMethod();
        ArgumentConvertInput convertInput = new ArgumentConvertInput();
        convertInput.setArguments(input.getMethodArgumentArraySize() > 0 ? Lists.newArrayList(methodArguments) : Lists.newArrayList());
        convertInput.setMethod(output.getTargetMethod());
        convertInput.setParameterTypes(output.getParameterTypes());
        ArgumentConvertOutput convertOutput = methodArgumentConverter.convert(convertInput);
        ReflectionUtils.makeAccessible(targetMethod);
        // 反射呼叫
        Object result = targetMethod.invoke(output.getTarget(), convertOutput.getArguments());
        ResponseMessagePacket response = new ResponseMessagePacket();
        response.setMagicNumber(packet.getMagicNumber());
        response.setVersion(packet.getVersion());
        response.setSerialNumber(packet.getSerialNumber());
        response.setAttachments(packet.getAttachments());
        response.setMessageType(MessageType.RESPONSE);
        response.setErrorCode(200L);
        response.setMessage("Success");
        response.setPayload(JSON.toJSONString(result));
        log.info("服務端輸出:{}", JSON.toJSONString(response));
        ctx.writeAndFlush(response);
    }
}

編寫一個Server的啟動類ServerApplication,在Spring容器啟動之後,啟動Netty服務:

@SpringBootApplication(scanBasePackages = "club.throwable.server")
@Slf4j
public class ServerApplication implements CommandLineRunner {

    @Value("${netty.port:9092}")
    private Integer nettyPort;

    @Autowired
    private ServerHandler serverHandler;

    public static void main(String[] args) throws Exception {
        SpringApplication.run(ServerApplication.class, args);
    }

    @Override
    public void run(String... args) throws Exception {
        int port = nettyPort;
        ServerBootstrap bootstrap = new ServerBootstrap();
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            bootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {

                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(1024, 0, 4, 0, 4));
                            ch.pipeline().addLast(new LengthFieldPrepender(4));
                            ch.pipeline().addLast(new RequestMessagePacketDecoder());
                            ch.pipeline().addLast(new ResponseMessagePacketEncoder(FastJsonSerializer.X));
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            ChannelFuture future = bootstrap.bind(port).sync();
            log.info("啟動NettyServer[{}]成功...", port);
            future.channel().closeFuture().sync();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }
}

最後,編寫契約包和契約實現:

- ch0-custom-rpc-protocol          專案根目錄
  - club.throwable
    - utils                        工具類
    - protocol                     協議
    - exception                    異常
    - contract                     契約
      - HelloService               契約介面
    - server                       服務端
      - contract
        - DefaultHelloService      契約介面實現
public interface HelloService {

    String sayHello(String name);
}

// 實現
@Service
public class DefaultHelloService implements HelloService {

    @Override
    public String sayHello(String name) {
        return String.format("%s say hello!", name);
    }
}

先啟動服務端ServerApplication,再啟動上一節提到的TestProtocolClient,輸出結果:

// 服務端日誌
2020-01-15 00:05:57.898  INFO 14420 --- [           main] club.throwable.server.ServerApplication  : 啟動NettyServer[9092]成功...
2020-01-15 00:06:05.980  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服務端接收到:RequestMessagePacket(interfaceName=club.throwable.contract.HelloService, methodName=sayHello, methodArgumentSignatures=[java.lang.String], methodArguments=[PooledUnsafeDirectByteBuf(ridx: 0, widx: 6, cap: 6/139)])
2020-01-15 00:06:07.448  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 查詢目標實現方法成功,目標類:club.throwable.server.contract.DefaultHelloService,宿主類:club.throwable.server.contract.DefaultHelloService,宿主方法:sayHello
2020-01-15 00:06:07.521  INFO 14420 --- [ntLoopGroup-3-1] club.throwable.server.ServerHandler      : 服務端輸出:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}

// 客戶端日誌
00:06:05.891 [main] INFO club.throwable.protocol.TestProtocolClient - 啟動NettyClient[9092]成功...
...省略...
00:06:13.197 [nioEventLoopGroup-2-1] INFO club.throwable.protocol.TestProtocolClient - 接收到來自服務端的響應訊息,訊息內容:{"attachments":{},"errorCode":200,"magicNumber":10086,"message":"Success","messageType":"RESPONSE","payload":"\"doge say hello!\"","serialNumber":"65f01b8e89bb479b8a36a60bd6519617","version":1}

可見RPC呼叫成功。

小結

編寫RPCServer端技巧在於處理目標方法和宿主類的查詢,在轉換方法引數的時候,需要考慮簡化處理和提高效率,剩下的就是做好異常處理和模組封裝。限於篇幅,後面會先分析Client端的處理,再分析心跳處理、服務端優化、甚至是對接註冊中心等等,在NettySpringBoot等優秀框架的加持下編寫一個RPC框架其實並不困難,困難的是效能優化和生態圈的支援。

Demo專案地址:

  • ch0-custom-rpc-protocol

(本文完 c-1-d e-a-20200115