1. 程式人生 > >【Netty】ChannelHandler和codec

【Netty】ChannelHandler和codec

creat int simple exception 代碼 大型 and sed log

一、前言

  前面學習了Netty的codec框架,下面接著學習ChannelHandler與codec之間的關聯。

二、ChannelHandler和codec

  Netty為不同的協議提供了處理器和編解碼器,可以開箱即用,這些工具支持SSL / TLS和WebSocket,以及通過數據壓縮使得HTTP有更好的性能。

  2.1 使用SSL/TLS保護Netty應用程序

  由於數據隱私非常重要,而SSL和TLS等加密協議用於處理數據隱私,這些協議在其他協議之上以實現數據安全性。安全網站會使用這種協議,同時,不基於HTTP的應用程序,如安全SMTP(SMTPS)郵件服務甚至關系數據庫系統也會用到這些協議。

  為支持SSL/TLS協議,Java提供了javax.net.ssl包,其SSLContext和SSLEngine類使得實現解密和加密變得相當直接容易。而Netty使用SslHandler的ChannelHandler實現,其內部是使用SSLEngine來進行實際工作。

  下圖展示了通過SslHandler的數據流。

  技術分享

  下面代碼展示如何通過ChannelInitializer將SslHandler添加至ChannelPipeline中。 

public class SslChannelInitializer extends ChannelInitializer<Channel>{
    
private final SslContext context; private final boolean startTls; public SslChannelInitializer(SslContext context, boolean startTls) { this.context = context; this.startTls = startTls; } @Override protected void initChannel(Channel ch) throws
Exception { SSLEngine engine = context.newEngine(ch.alloc()); ch.pipeline().addFirst("ssl",   new SslHandler(engine, startTls)); } }

  在大多數情況下,SslHandler都是ChannelPipeline中的第一個ChannelHandler,只有當所有其他ChannelHandler將其邏輯應用於數據之後,才能進行加密操作。

  2.2 構建Netty的HTTP/HTTPS應用程序

  HTTP/HTTPS是最常見的協議套件之一,許多Web Service API都是基於HTTP/HTTPS的。

  1. HTTP的decoder、encoder、codec

  HTTP基於請求/響應模式:客戶端發送HTTP請求至服務端,服務端響應HTTP請求至客戶端。Netty提供各種編碼器和解碼器以簡化使用此協議。

  下圖展示了HTTP請求的組成部分。

  技術分享

  而下圖展示了HTTP響應的組成部分。

  技術分享

  下面代碼的HttpPipelineInitializer可以為應用提供HTTP支持,僅僅只需要往ChannelPipeline中添加正確的ChannelHandler即可。 

public class HttpPipelineInitializer extends ChannelInitializer<Channel> {
    private final boolean client;
    public HttpPipelineInitializer(boolean client) {
        this.client = client;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (client) {
            pipeline.addLast("decoder", new HttpResponseDecoder());
            pipeline.addLast("encoder", new HttpRequestEncoder());
        } else {
            pipeline.addLast("decoder", new HttpRequestDecoder());
            pipeline.addLast("encoder", new HttpResponseEncoder());
        }
    }
}

  2. HTTP消息聚合

  當管道中的處理器被初始化後,就可以處理不同的消息,但由於HTTP請求和響應由多部分組成,需要將這些部分匯聚成完整的結果,Netty提供了一個合成器,其可以消息的不同部分組合成FullHttpRequest和FullHttpResponse。由於消息在完整之前需要被緩存,因此其會帶來小小的開銷,但是不用擔心消息的碎片化處理。

  下面代碼展示了如何自動化裝配消息。

public class HttpAggregatorInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpAggregatorInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
        pipeline.addLast("aggregator",
            new HttpObjectAggregator(512 * 1024));
    }
}

  3. HTTP壓縮

  使用HTTP時,建議采用壓縮來盡可能減少傳輸數據的大小。Netty為壓縮和解壓縮提供了ChannelHandler的實現,其支持gzip和deflate編碼。

  如下代碼展示了如何自動壓縮HTTP消息。  

public class HttpCompressionInitializer extends ChannelInitializer<Channel> {
    private final boolean isClient;
    public HttpCompressionInitializer(boolean isClient) {
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
            pipeline.addLast("decompressor",
            new HttpContentDecompressor());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
            pipeline.addLast("compressor",
                new HttpContentCompressor());
        }
    }
}

  4. 使用HTTPS

  當混合了SslHandler時就可以使用HTTPS,代碼如下。  

public class HttpsCodecInitializer extends ChannelInitializer<Channel> {
    private final SslContext context;
    private final boolean isClient;
    public HttpsCodecInitializer(SslContext context, boolean isClient) {
        this.context = context;
        this.isClient = isClient;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        SSLEngine engine = context.newEngine(ch.alloc());
        pipeline.addFirst("ssl", new SslHandler(engine));
        if (isClient) {
            pipeline.addLast("codec", new HttpClientCodec());
        } else {
            pipeline.addLast("codec", new HttpServerCodec());
        }
    }
}

  5. WebSocket

  Netty為基於HTTP的應用程序提供了支持,WebSocket解決了長期存在的問題:如果底層使用HTTP協議,是一系列請求-響應交互,那麽如何實時發布信息。AJAX提供了一些改進,但是數據流的驅動仍然來自客戶端的請求。而WebSocket規範及其實現代表了一種更有效的解決方案,其為雙向流量提供了一個單一的TCP連接,其在客戶端和服務器之間提供真正的雙向數據交換,並且可以處理任何類型的數據。

  下圖展示了WebSocket協議,開始數據通信為純HTTP,之後升級到雙向WebSocket。

  技術分享

  為了應用中添加WebSocket支持,可以在管道中添加WebSocketFrames,其包含如下類型。

  技術分享

  下面代碼展示了如何使用WebSocketServerProtocolHandler。  

public class WebSocketServerInitializer extends ChannelInitializer<Channel>{
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ch.pipeline().addLast(
            new HttpServerCodec(),
            new HttpObjectAggregator(65536),
            new WebSocketServerProtocolHandler("/websocket"),
            new TextFrameHandler(),
            new BinaryFrameHandler(),
            new ContinuationFrameHandler());
    }
    public static final class TextFrameHandler extends
        SimpleChannelInboundHandler<TextWebSocketFrame> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            TextWebSocketFrame msg) throws Exception {
            // Handle text frame
        }
}

public static final class BinaryFrameHandler extends
    SimpleChannelInboundHandler<BinaryWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        BinaryWebSocketFrame msg) throws Exception {
        // Handle binary frame
    }
}

public static final class ContinuationFrameHandler extends
    SimpleChannelInboundHandler<ContinuationWebSocketFrame> {
    @Override
    public void channelRead0(ChannelHandlerContext ctx,
        ContinuationWebSocketFrame msg) throws Exception {
            // Handle continuation frame
        }
    }
}

  而為了保證WebSocket的安全性,可以將SslHandler作為管道中的第一個處理器。

  2.3 空閑連接和超時

  檢測空閑連接和超時對於及時釋放資源至關重要,Netty提供了幾個處理器進行處理,如IdleStateHandler、ReadTimeoutHandler、WriteTimeoutHandler等。

  首先看看IdleStateHandler的原因,如果60S內沒有接受或發送數據,如何接收通知,可以使用向遠程對等體發送心跳消息來獲取通知; 如果沒有響應,則連接被關閉。

public class IdleStateHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(
            new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS));
        pipeline.addLast(new HeartbeatHandler());
    }
    public static final class HeartbeatHandler
        extends ChannelStateHandlerAdapter {
        private static final ByteBuf HEARTBEAT_SEQUENCE =
            Unpooled.unreleasableBuffer(Unpooled.copiedBuffer(
                "HEARTBEAT", CharsetUtil.ISO_8859_1));
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx,
            Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate())
                .addListener(
                    ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                super.userEventTriggered(ctx, evt);
            }
        }
    }
}

  2.4 解碼限定的和基於長度的協議

  1. 限定的協議

  限定消息協議使用指定的字符來標記稱為幀的消息或消息段的開始或結束位置,如如SMTP,POP3,IMAP和Telnet均使用該協議。如下的兩個解碼器用於處理限定的和基於長度的協議。

  技術分享

  下圖顯示了如何以行尾序列\ r \ n(回車+換行)分隔符來處理幀。

  技術分享

  如下代碼展示了如何使用LineBasedFrameDecoder。 

public class LineBasedHandlerInitializer extends ChannelInitializer<Channel> {
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new LineBasedFrameDecoder(64 * 1024));
        pipeline.addLast(new FrameHandler());
    }
    public static final class FrameHandler
        extends SimpleChannelInboundHandler<ByteBuf> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx,
            ByteBuf msg) throws Exception {
            // Do something with the data extracted from the frame
        }
    }
}

  DelimiterBasedFrameDecoder類可以自定義字符來處理數據。

  2. 基於長度的協議

  基於長度的協議通過在幀的頭部對其長度進行編碼來定義幀,而不是通過用特殊的分隔符標記其結尾,如下兩個類用於處理該協議。

  技術分享

  下圖展示了FixedLengthFrameDecoder的操作,每幀由8個字節構成。

  技術分享

  對於可變長度的幀,可以使用LengthFieldBasedFrameDecoder進行處理,其從頭部字段確定幀長度,並從數據流中提取指定的字節數。

  下圖展示了長度字段在頭部的偏移量0並且具有2字節。

  技術分享

  2.5 寫大數據

  由於網絡原因,異步框架中的寫入大量數據將存在問題,因為寫入操作是非阻塞的,在寫入完成時或者所有的數據都沒有被寫出就會通知ChannelFuture,此時,若不停止寫入數據,可能會造成內存溢出,因此,在寫入大量數據時,需要處理與遠程對等體連接緩慢可能導致內存釋放延遲的情況。FileRegion支持零拷貝文件傳輸的通道。

  下面是FileRegion的使用示例。  

FileInputStream in = new FileInputStream(file);
FileRegion region = new DefaultFileRegion(
    in.getChannel(), 0, file.length());
channel.writeAndFlush(region).addListener(
    new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future)
        throws Exception {
        if (!future.isSuccess()) {
            Throwable cause = future.cause();
            // Do something
        }
    }
});

  此示例僅適用於文件內容的直接傳輸,不包括應用程序對數據的任何處理,若需要將數據從文件系統復制到用戶內存,可以使用ChunkedWriteHandler,它支持異步編寫大型數據流,而不會導致高內存消耗。

  其核心是ChunkedInput<B>接口,B代碼readChunk的返回類型,對於ChunkedInput<B>,有如下的四種實現。

  技術分享

  如下是ChunkedStream的使用示例。  

public class ChunkedWriteHandlerInitializer
    extends ChannelInitializer<Channel> {
    private final File file;
    private final SslContext sslCtx;
    
    public ChunkedWriteHandlerInitializer(File file, SslContext sslCtx) {
        this.file = file;
        this.sslCtx = sslCtx;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new SslHandler(sslCtx.createEngine());
        pipeline.addLast(new ChunkedWriteHandler());
        pipeline.addLast(new WriteStreamHandler());
    }
    
    public final class WriteStreamHandler
        extends ChannelInboundHandlerAdapter {
        @Override
        public void channelActive(ChannelHandlerContext ctx)
            throws Exception {
            super.channelActive(ctx);
            ctx.writeAndFlush(
            new ChunkedStream(new FileInputStream(file)));
        }
    }
}

  2.6 序列化數據

  JDK提供了ObjectOutputStream和ObjectInputStream對POJO的原始數據類型和圖形進行序列化和反序列化,但其並不高效。

  1. JDK序列化

  當應用程序必須與使用ObjectOutputStream和ObjectInputStream與對等體進行交互時,首先應該考慮兼容性,此時JDK的序列化是正確的選擇。下圖展示了Netty與JDK進行互操作的序列化類。

  技術分享

  2. JBOSS序列化

  JBOSS比JDK序列化快三倍,並且更為緊湊,Netty使用如下兩個編解碼器來支持JBOSS。

  技術分享

  如下代碼展示了如何使用JBOSS進行序列化操作。 

public class MarshallingInitializer extends ChannelInitializer<Channel> {
    private final MarshallerProvider marshallerProvider;
    private final UnmarshallerProvider unmarshallerProvider;
    public MarshallingInitializer(
        UnmarshallerProvider unmarshallerProvider,
        MarshallerProvider marshallerProvider) {
        this.marshallerProvider = marshallerProvider;
        this.unmarshallerProvider = unmarshallerProvider;
    }
    @Override
    protected void initChannel(Channel channel) throws Exception {
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addLast(new MarshallingDecoder(unmarshallerProvider));
        pipeline.addLast(new MarshallingEncoder(marshallerProvider));
        pipeline.addLast(new ObjectHandler());
    }
    public static final class ObjectHandler
        extends SimpleChannelInboundHandler<Serializable> {
        @Override
        public void channelRead0(
            ChannelHandlerContext channelHandlerContext,
            Serializable serializable) throws Exception {
            // Do something
        }
    }
}

  3. Protobuf序列化

  Netty還可使用協議緩沖區進行序列化操作,其由谷歌開發,協議緩沖區以緊湊和高效的方式對結構化數據進行編碼和解碼,下圖顯示了Netty為支持protobuf而實現的ChannelHandler。

  技術分享

  下面示例展示了如何使用protobuf。 

public class ProtoBufInitializer extends ChannelInitializer<Channel> {
    private final MessageLite lite;
    public ProtoBufInitializer(MessageLite lite) {
        this.lite = lite;
    }
    @Override
    protected void initChannel(Channel ch) throws Exception {
        ChannelPipeline pipeline = ch.pipeline();
        pipeline.addLast(new ProtobufVarint32FrameDecoder());
        pipeline.addLast(new ProtobufEncoder());
        pipeline.addLast(new ProtobufDecoder(lite));
        pipeline.addLast(new ObjectHandler());
    }
    public static final class ObjectHandler
        extends SimpleChannelInboundHandler<Object> {
        @Override
        public void channelRead0(ChannelHandlerContext ctx, Object msg)
            throws Exception {
            // Do something with the object
        }
    }
}

三、總結

  本篇博文講解了如何保證Netty應用程序的安全性,以及如何處理不同協議的數據,以及編解碼器和ChannelHandler之間的關系,以及如何把編解碼器添加至管道中進行處理。也謝謝各位園友的觀看~

【Netty】ChannelHandler和codec