1. 程式人生 > >基於netty實現rpc框架-spring boot服務端

基於netty實現rpc框架-spring boot服務端

demo地址

https://gitee.com/syher/grave-netty

 

RPC介紹

首先了解一下RPC:遠端過程呼叫。簡單點說就是本地應用可以呼叫遠端伺服器的介面。那麼通過什麼方式呼叫遠端介面呢?說白了RPC只是一種概念。他的呼叫可以基於HTTP實現,也可以基於TCP/IP實現。甚至私人定製的通訊協議。

當然,私人定製通訊協議成本過高且不具備通用性。我們不做展開討論(其實我也展不開。。。)。那為什麼不使用HTTP協議呢?受限於HTTP協議層級過高,資料傳輸效率不如TCP/IP。所以RPC遠端呼叫一般採用TCP/IP實現。即呼叫socket方法。

 

RPC實現原理

1. 客戶端發起遠端服務呼叫。

2. 客戶端將類資訊、呼叫方法和入參資訊通過socket通道傳送給服務端。

3. 服務端解析資料包,呼叫本地介面。

5.將執行結果通過socket返回給客戶端。

6.客戶端拿到並解析返回結果。

 

RPC實現

java如何實現一個rpc框架,其實就是按照上面的原理再做一些詳細的補充。比如通過動態代理封裝客戶端的資料包、通過反射機制實現服務端實現類的呼叫等等。

今天,我們先基於spring boot + netty 做rpc服務端的實現。

 

首先,做一個註解用於標識介面提供rpc呼叫。

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface Service {
    String name() default "";
}

  

該註解用於提供服務的實現類上。

public interface INettyService {

    String getString();
}

  

其實現類:

package com.braska.grave.netty.server.service;

@Service // 該註解為自定義rpc服務註解
public class NettyService implements INettyService {
    @Override
    public String getString() {
        return "welcome to use netty rpc.";
    }
}

  

接著,定義一個註解用來掃描指定包名下的Service註解。

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({NettyServerScannerRegistrar.class, NettyServerApplicationContextAware.class})
public @interface NettyServerScan {

    String[] basePackages();
}

  

該註解用於spring boot啟動類上,引數basePackages指定服務所在的包路徑。

@SpringBootApplication
@NettyServerScan(basePackages = {
        "com.braska.grave.netty.server.service"
})
public class GraveNettyServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(GraveNettyServerApplication.class, args);
    }

}

  

NettyServerScannerRegistrar類處理服務的spring bean註冊。

public class NettyServerScannerRegistrar implements BeanFactoryAware, ImportBeanDefinitionRegistrar, ResourceLoaderAware {
    

    @Override
    public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) {
     
    // 建立掃描器例項 NettyServerInterfaceScanner scanner = new NettyServerInterfaceScanner(registry); if (this.resourceLoader != null) { scanner.setResourceLoader(this.resourceLoader); } AnnotationAttributes annoAttrs = AnnotationAttributes.fromMap(importingClassMetadata.getAnnotationAttributes(NettyServerScan.class.getName())); List<String> basePackages = new ArrayList<String>(); for (String pkg : annoAttrs.getStringArray("basePackages")) { if (StringUtils.hasText(pkg)) { basePackages.add(pkg); } }      // 只掃描指定的註解。 scanner.setAnnotationClass(Service.class); scanner.registerFilters();

     // 將basePackages裡面的通過@Service註解的類註冊成spring bean。 scanner.doScan(StringUtils.toStringArray(basePackages)); } }

  

NettyServerApplicationContextAware類,暴露socket server埠。

public class NettyServerApplicationContextAware implements ApplicationContextAware, InitializingBean {
    private static final Logger logger = Logger.getLogger(NettyServerApplicationContextAware.class.getName());

   // 儲存介面與實現類的對映,其中key是介面名。value是實現類的bean。 private Map<String, Object> serviceMap = new HashMap<>();

   // 服務worker。包含netty socket服務端生命週期及讀寫。 ServerWorker runner; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { String address = applicationContext.getEnvironment().getProperty("remoteAddress"); Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Service.class); for (Object serviceBean : beans.values()) { Class<?> clazz = serviceBean.getClass(); Class<?>[] interfaces = clazz.getInterfaces(); for (Class<?> inter : interfaces) { String interfaceName = inter.getName(); serviceMap.put(interfaceName, serviceBean); } }

     // 建立netty worker物件 runner = new ServerWorker(address, serviceMap); } @Override public void afterPropertiesSet() throws Exception {

     // 建立netty socketServer及通道處理器 runner.open(); } }

  

ServerWorker類的open方法。

public class ServerWorker extends ChannelInitializer {

   // socket ip:port private String remoteAddress;

// 實現類的beanMap private Map<String, Object> serviceMap;
// netty channel處理器 NettyServerHandler handler;public void open() { try { int parallel = Runtime.getRuntime().availableProcessors() * 2; ServerBootstrap bootstrap = new ServerBootstrap(); this.bossGroup = new NioEventLoopGroup(); // todo 使用執行緒池,提高併發能力 this.workerGroup = new NioEventLoopGroup(parallel); bootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) .childOption(ChannelOption.SO_KEEPALIVE, true) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(this); String[] hostAndPort = this.remoteAddress.split(":"); if (hostAndPort == null || hostAndPort.length != 2) { throw new RuntimeException("remoteAddress is error."); } ChannelFuture cf = bootstrap.bind(hostAndPort[0], Integer.parseInt(hostAndPort[1])).sync(); // todo 資訊寫入註冊中心 // registry.register(serverAddress); logger.info("netty 伺服器啟動.監聽埠:" + hostAndPort[1]); // 等待服務端監聽埠關閉 cf.channel().closeFuture().sync(); } catch (Exception e) { logger.log(Level.SEVERE, "netty server open failed.", e); this.bossGroup.shutdownGracefully(); this.workerGroup.shutdownGracefully(); } } @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JSONEncoder()); pipeline.addLast(new JSONDecoder()); pipeline.addLast(this.handler); } }

 

NettyServerHandler服務端channel處理器,繼承ChannelInboundHandlerAdapter。

@ChannelHandler.Sharable
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    private Map<String, Object> serviceMap;

    public NettyServerHandler(Map<String, Object> serviceMap) {
        this.serviceMap = serviceMap;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {

     // 解析客戶端傳送過來的資料。包含類名、方法名、入參等資訊。 Request request = JSON.parseObject(msg.toString(), Request.class); Response response = new Response(); response.setRequestId(request.getId()); try {

       // 呼叫本地實現類 Object res = this.handler(request); response.setData(res); } catch (Exception e) { response.setCode(-1); response.setError(e.getMessage()); logger.log(Level.SEVERE, "請求呼叫失敗", e); }

     // 返回處理結果給客戶端 ctx.writeAndFlush(response); } private Object handler(Request request) throws Exception { String className = request.getClassName();

     // 通過className從beanMap對映中找到託管給spring的bean實現類。 Object serviceBean = serviceMap.get(className); String methodName = request.getMethodName(); Object[] parameters = request.getParameters();

     // 通過反射機制呼叫實現類。並返回呼叫結果。 return MethodUtils.invokeMethod(serviceBean, methodName, parameters); } }

  

至此,rpc服務端的實現就完成了。

一路看下來,服務端的程式碼實現還是比較簡單的。核心程式碼只有兩個類:ServerWorker和NettyServerHandler。其餘的都是對spring bean註冊的支