1. 程式人生 > >Netty 實現簡單RPC呼叫

Netty 實現簡單RPC呼叫

RPC,即 Remote Procedure Call(遠端過程呼叫),說得通俗一點就是:呼叫遠端計算機上的服務,就像呼叫本地服務一樣。

RPC 可基於 HTTP 或 TCP 協議,Web Service 就是基於 HTTP 協議的 RPC,它具有良好的跨平臺性,但其效能卻不如基於 TCP 協議的 RPC。會兩方面會直接影響 RPC 的效能,一是傳輸方式,二是序列化。

眾所周知,TCP 是傳輸層協議,HTTP 是應用層協議,而傳輸層較應用層更加底層,在資料傳輸方面,越底層越快,因此,在一般情況下,TCP 一定比 HTTP 快。就序列化而言,Java 提供了預設的序列化方式,但在高併發的情況下,這種方式將會帶來一些效能上的瓶頸,於是市面上出現了一系列優秀的序列化框架,比如:Protobuf、Kryo、Hessian、Jackson 等,它們可以取代 Java 預設的序列化,從而提供更高效的效能。
下面是簡單實現的基於netty的RPC呼叫。

一、首先定義訊息傳遞的實體類

span style="font-size:14px;">public class ClassInfo implements Serializable {  

    private static final long serialVersionUID = -8970942815543515064L;  

    private String className;//類名  
    private String methodName;//函式名稱  
    private Class<?>[] types;//引數型別    
    private
Object[] objects;//引數列表 public String getClassName() { return className; } public void setClassName(String className) { this.className = className; } public String getMethodName() { return methodName; } public void setMethodName
(String methodName) { this.methodName = methodName; } public Class<?>[] getTypes() { return types; } public void setTypes(Class<?>[] types) { this.types = types; } public Object[] getObjects() { return objects; } public void setObjects(Object[] objects) { this.objects = objects; } }

二、建立Netty操作的服務端,以及具體操作
1. 服務端

public class RPCServer {  
    private int port;  
    public RPCServer(int port){  
        this.port = port;  
    }  
    public void start(){  
        EventLoopGroup bossGroup = new NioEventLoopGroup();  
        EventLoopGroup workerGroup = new NioEventLoopGroup();  

        try {  
            ServerBootstrap serverBootstrap = new ServerBootstrap().group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)  
                    .localAddress(port).childHandler(new ChannelInitializer<SocketChannel>() {  

                        @Override  
                        protected void initChannel(SocketChannel ch) throws Exception {  
                            ChannelPipeline pipeline = ch.pipeline();    
                             pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));    
                                pipeline.addLast(new LengthFieldPrepender(4));    
                                pipeline.addLast("encoder", new ObjectEncoder());      
                                pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));    
                                pipeline.addLast(new InvokerHandler());   
                        }  
                    }).option(ChannelOption.SO_BACKLOG, 128)       
                    .childOption(ChannelOption.SO_KEEPALIVE, true);  
            ChannelFuture future = serverBootstrap.bind(port).sync();      
            System.out.println("Server start listen at " + port );    
            future.channel().closeFuture().sync();    
        } catch (Exception e) {  
             bossGroup.shutdownGracefully();    
             workerGroup.shutdownGracefully();  
        }  
    }  
    public static void main(String[] args) throws Exception {    
        int port;    
        if (args.length > 0) {    
            port = Integer.parseInt(args[0]);    
        } else {    
            port = 8080;    
        }    
        new RPCServer(port).start();    
    }    
}  
  1. 服務端操作,由服務端我們看到具體的資料傳輸操作是進行序列化的,具體的操作還是比較簡單的,就是獲取傳送過來的資訊,這樣就可以通過反射獲得類名,根據函式名和引數值,執行具體的操作,將執行結果傳送給客戶端
public class InvokerHandler extends ChannelInboundHandlerAdapter {  
    public static ConcurrentHashMap<String, Object> classMap = new ConcurrentHashMap<String,Object>();  
    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
        ClassInfo classInfo = (ClassInfo)msg;  
        Object claszz = null;  
        if(!classMap.containsKey(classInfo.getClassName())){  
            try {  
                claszz = Class.forName(classInfo.getClassName()).newInstance();  
                classMap.put(classInfo.getClassName(), claszz);  
            } catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {  
                e.printStackTrace();  
            }  
        }else {  
            claszz = classMap.get(classInfo.getClassName());  
        }  
        Method method = claszz.getClass().getMethod(classInfo.getMethodName(), classInfo.getTypes());    
        Object result = method.invoke(claszz, classInfo.getObjects());   
        ctx.write(result);  
        ctx.flush();    
        ctx.close();  
    }    
    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
         cause.printStackTrace();    
         ctx.close();    
    }    

}  

三、客戶端,通過代理機制來觸發遠端呼叫
(1)客戶端,當執行具體的函式時會呼叫遠端操作,將具體操作的類、函式及引數資訊傳送到服務端

public class RPCProxy {  

    @SuppressWarnings("unchecked")  
    public static <T> T create(Object target){  

        return (T) Proxy.newProxyInstance(target.getClass().getClassLoader(),target.getClass().getInterfaces(), new InvocationHandler(){  

            @Override  
            public Object invoke(Object proxy, Method method, Object[] args)  
                        throws Throwable {  

                ClassInfo classInfo = new ClassInfo();  
                classInfo.setClassName(target.getClass().getName());  
                classInfo.setMethodName(method.getName());  
                classInfo.setObjects(args);  
                classInfo.setTypes(method.getParameterTypes());  

                ResultHandler resultHandler = new ResultHandler();  
                EventLoopGroup group = new NioEventLoopGroup();    
                try {    
                    Bootstrap b = new Bootstrap();    
                    b.group(group)    
                     .channel(NioSocketChannel.class)    
                     .option(ChannelOption.TCP_NODELAY, true)    
                     .handler(new ChannelInitializer<SocketChannel>() {    
                         @Override    
                         public void initChannel(SocketChannel ch) throws Exception {    
                             ChannelPipeline pipeline = ch.pipeline();    
                             pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));    
                             pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));    
                             pipeline.addLast("encoder", new ObjectEncoder());      
                             pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));    
                             pipeline.addLast("handler",resultHandler);  
                         }    
                     });    

                    ChannelFuture future = b.connect("localhost", 8080).sync();    
                    future.channel().writeAndFlush(classInfo).sync();  
                    future.channel().closeFuture().sync();    
                } finally {    
                    group.shutdownGracefully();    
                }  
                return resultHandler.getResponse();  
            }  
        });  
    }  
}  
  1. 獲取遠端呼叫返回的結果值
public class ResultHandler extends ChannelInboundHandlerAdapter {  

    private Object response;    

    public Object getResponse() {    
    return response;    
}    

    @Override    
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {    
        response=msg;    
        System.out.println("client接收到伺服器返回的訊息:" + msg);    
    }    

    @Override    
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {    
        System.out.println("client exception is general");    
    }    
}  

四、介面、實現類及Main操作

public interface HelloRpc {  
    String hello(String name);  
}  
public class HelloRpcImpl implements HelloRpc {  

    @Override  
    public String hello(String name) {  
        return "hello "+name;  
    }  

}