1. 程式人生 > >netty最佳實踐之資料通訊(轉載)

netty最佳實踐之資料通訊(轉載)

一、背景描述

使用Netty進行兩臺或者多臺伺服器之間的資料通訊,大體有以下三種情況:

  1. 使用長連線通道不斷開的方式進行通訊。也就是伺服器和客戶端的通道不斷開,一直處於開啟狀態,如果伺服器的效能足夠好,並且我們的客戶端數量足夠少的情況下,推薦這種方式。
  2. 一次性批量提交資料,推薦採用短連線方式。即我們可以把資料儲存在本地臨時緩衝區或者臨時表中,當到達一定臨界值的時候一次性批量提交,或者是根據定時任務輪詢提交,這種方式的弊端是做不到實時性,在對實時性要求不高的應用程式中推薦使用。
  3. 我們可以使用一種特殊的長連線,在指定的一段時間內,伺服器與某臺客戶端麼有進行任何通訊,則斷開連線。下次客戶端向伺服器端傳送請求時再次建立連線,

3這種模式需要考慮兩種因素:

  • 3.1 如何在伺服器端和客戶端在一定時間超時後關閉通道?關閉通道後如何再建立連線? 答案:可以使用nettyReadTimeoutHandler,在一定時間內沒讀取到資料則斷開連線;再次建立連線直接發起請求即可。

  • 3.2 客戶端宕機時我們無需考慮,下次客戶端重啟後就可以與伺服器建立連線;但是伺服器宕機後,我們客戶端如何與服務端建立連線? 答案無非是隔一段事件輪詢建立連線。

二、程式碼示例

nettyReadTimeOut實現方案3

  • 服務端程式碼
import io.netty.bootstrap.ServerBootstrap;
import
io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.ReadTimeoutHandler; public class Server { public static void main(String[] args) throws Exception{ EventLoopGroup pGroup = new NioEventLoopGroup(); EventLoopGroup cGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(pGroup, cGroup) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) //設定日誌 .handler(new LoggingHandler(LogLevel.INFO)) .childHandler(new ChannelInitializer<SocketChannel>() { protected void initChannel(SocketChannel sc) throws Exception { sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder()); sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder()); //設定服務端的超時時間 sc.pipeline().addLast(new ReadTimeoutHandler(5)); sc.pipeline().addLast(new ServerHandler()); } }); ChannelFuture cf = b.bind(8765).sync(); cf.channel().closeFuture().sync(); pGroup.shutdownGracefully(); cGroup.shutdownGracefully(); } }
  • ServerHandler
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
public class ServerHandler extends ChannelHandlerAdapter{
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {

    }
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Request request = (Request)msg;
        System.out.println("Server : " + request.getId() + ", " + request.getName() + ", " + request.getRequestMessage());
        Response response = new Response();
        response.setId(request.getId());
        response.setName("response" + request.getId());
        response.setResponseMessage("響應內容" + request.getId());
        ctx.writeAndFlush(response);//.addListener(ChannelFutureListener.CLOSE);
    }
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        
    }
   @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }    
}
  • client
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;
import io.netty.handler.timeout.ReadTimeoutHandler;

import java.util.concurrent.TimeUnit;
public class Client {  
    private static class SingletonHolder {
        static final Client instance = new Client();
    }
    
    public static Client getInstance(){
        return SingletonHolder.instance;
    }
    
    private EventLoopGroup group;
    private Bootstrap b;
    private ChannelFuture cf ;
    
    private Client(){
            group = new NioEventLoopGroup();
            b = new Bootstrap();
            b.group(group)
             .channel(NioSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .handler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel sc) throws Exception {
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingDecoder());
                        sc.pipeline().addLast(MarshallingCodeCFactory.buildMarshallingEncoder());
                        //超時handler(當伺服器端與客戶端在指定時間以上沒有任何進行通訊,則會關閉響應的通道,主要為減小服務端資源佔用)
                        sc.pipeline().addLast(new ReadTimeoutHandler(5)); 
                        sc.pipeline().addLast(new ClientHandler());
                    }
            });
    }
    
    public void connect(){
        try {
            this.cf = b.connect("127.0.0.1", 8765).sync();
            System.out.println("遠端伺服器已經連線, 可以進行資料交換..");                
        } catch (Exception e) {
            e.printStackTrace();
        }
    }    
    public ChannelFuture getChannelFuture(){
        
        if(this.cf == null){
            this.connect();
        }
        if(!this.cf.channel().isActive()){
            this.connect();
        }        
        return this.cf;
    }  
    public static void main(String[] args) throws Exception{
        final Client c = Client.getInstance();
        //c.connect();
        
        ChannelFuture cf = c.getChannelFuture();
        //客戶端每隔4s鍾向伺服器端發資料:
        for(int i = 1; i <= 3; i++ ){
            Request request = new Request();
            request.setId("" + i);
            request.setName("pro" + i);
            request.setRequestMessage("資料資訊" + i);
            cf.channel().writeAndFlush(request);
            TimeUnit.SECONDS.sleep(4);
        }
        cf.channel().closeFuture().sync();      
        //讓客戶端斷開後可以重新連線上
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    System.out.println("進入子執行緒...");
                    ChannelFuture cf = c.getChannelFuture();
                    System.out.println(cf.channel().isActive());
                    System.out.println(cf.channel().isOpen());                 
                    //再次傳送資料
                    Request request = new Request();
                    request.setId("" + 4);
                    request.setName("pro" + 4);
                    request.setRequestMessage("資料資訊" + 4);
                    cf.channel().writeAndFlush(request);                    
                    cf.channel().closeFuture().sync();
                    System.out.println("子執行緒結束.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();       
        System.out.println("斷開連線,主執行緒結束..");      
    }    
}

main方法,可以發現for(int i = 1; i <= 3; i++ ) 這個迴圈中,每個迴圈停頓4秒,也就是每隔4秒傳送一次請求,而伺服器端的超時時間設定為5秒,那麼在這個for迴圈期間連線是不會斷開的,等for迴圈結束cf.channel().closeFuture().sync(); 斷開連線this.cf.channel().isActive() 變為否,在new Thread()中再次傳送請求,getChannelFuture會重新建立連線

  • clientHandler
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.ReferenceCountUtil;
public class ClientHandler extends ChannelHandlerAdapter{   
   @Override
   public void channelActive(ChannelHandlerContext ctx) throws Exception {

   }
   @Override
   public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
       try {
           Response resp = (Response)msg;
           System.out.println("Client : " + resp.getId() + ", " + resp.getName() + ", " + resp.getResponseMessage());            
       } finally {
           ReferenceCountUtil.release(msg);
       }
   }
   @Override
   public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
       
   }
   @Override
   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
   }    
}
  • Marshalling工廠
import io.netty.handler.codec.marshalling.DefaultMarshallerProvider;
import io.netty.handler.codec.marshalling.DefaultUnmarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallerProvider;
import io.netty.handler.codec.marshalling.MarshallingDecoder;
import io.netty.handler.codec.marshalling.MarshallingEncoder;
import io.netty.handler.codec.marshalling.UnmarshallerProvider;

import org.jboss.marshalling.MarshallerFactory;
import org.jboss.marshalling.Marshalling;
import org.jboss.marshalling.MarshallingConfiguration;
public final class MarshallingCodeCFactory {
    /**
     * 建立Jboss Marshalling解碼器MarshallingDecoder
     * @return MarshallingDecoder
     */
    public static MarshallingDecoder buildMarshallingDecoder() {
        //首先通過Marshalling工具類的精通方法獲取Marshalling例項物件 引數serial標識建立的是java序列化工廠物件。
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        //建立了MarshallingConfiguration物件,配置了版本號為5 
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        //根據marshallerFactory和configuration建立provider
        UnmarshallerProvider provider = new DefaultUnmarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingDecoder物件,倆個引數分別為provider和單個訊息序列化後的最大長度
        MarshallingDecoder decoder = new MarshallingDecoder(provider, 1024);
        return decoder;
    }

    /**
     * 建立Jboss Marshalling編碼器MarshallingEncoder
     * @return MarshallingEncoder
     */
    public static MarshallingEncoder buildMarshallingEncoder() {
        final MarshallerFactory marshallerFactory = Marshalling.getProvidedMarshallerFactory("serial");
        final MarshallingConfiguration configuration = new MarshallingConfiguration();
        configuration.setVersion(5);
        MarshallerProvider provider = new DefaultMarshallerProvider(marshallerFactory, configuration);
        //構建Netty的MarshallingEncoder物件,MarshallingEncoder用於實現序列化介面的POJO物件序列化為二進位制陣列
        MarshallingEncoder encoder = new MarshallingEncoder(provider);
        return encoder;
    }
}
  • 其餘的兩個傳輸物件
public class Request implements Serializable{

	private static final long  SerialVersionUID = 1L;
	
	private String id ;
	private String name ;
	private String requestMessage ;
	……
}
public class Response implements Serializable{
	
	private static final long serialVersionUID = 1L;
	
	private String id;
	private String name;
	private String responseMessage;
	……
}

參考