1. 程式人生 > >架構設計:系統間通訊(6)——IO通訊模型和Netty 上篇

架構設計:系統間通訊(6)——IO通訊模型和Netty 上篇

1、Netty介紹

在Netty官網上,對於Netty的介紹是:

Netty is a NIO client server framework which enables quick and easy development of network applications such as protocol servers and clients. It greatly simplifies and streamlines network programming such as TCP and UDP socket server.

‘Quick and easy’ doesn’t mean that a resulting application will suffer from a maintainability or a performance issue. Netty has been designed carefully with the experiences earned from the implementation of a lot of protocols such as FTP, SMTP, HTTP, and various binary and text-based legacy protocols. As a result, Netty has succeeded to find a way to achieve ease of development, performance, stability, and flexibility without a compromise.

百度上的中文解釋是:

Netty是由JBOSS提供的一個java開源框架。Netty提供非同步的、事件驅動的網路應用程式框架和工具,用以快速開發高效能、高可靠性的網路伺服器和客戶端程式。

但實際上呢,Netty框架並不只是封裝了多路複用的IO模型,也包括提供了傳統的阻塞式/非阻塞式 同步IO的模型封裝。當然,從Netty官網上的幾句中文並不能概括完Netty的全部作用。下面的兩篇文章我們將會在您已經理解原生的JAVA NIO框架的基礎上,向您介紹Netty的原理和使用。

這裡說明一下,講解Netty並不是我們這個系列“系統間通訊”的內容重點。目的是通過講解IO通訊模型、JAVA對各種通訊模型的支援、上層的Netty/MINA封裝,可以讓大家深刻理解“系統間通訊”中一個重要要素——資訊如何傳遞

2、Netty快速上手

2-1、程式碼示例

下面這段程式碼本身就比較好理解,我在其上又加上了比較詳細的註解。相信就算您之前沒有接觸過Netty,也應該是可以看懂的。如果您之前接觸過Netty,那您可以發現,這段程式碼中基本上已經包含了Netty中比較重要的幾個概念了:Channel、Buffer、ChannelPipeline、ChannelHandler、ChannelHandlerContext等

是的,我們將從這個示例程式碼入手,介紹Netty的基本概念和使用。然後我們再回頭看看上文中的那個問題:為什麼已經有的JAVA NIO框架,還需要一個Netty呢

package
testNetty; import java.net.InetSocketAddress; import java.nio.channels.spi.SelectorProvider; import java.util.concurrent.ThreadFactory; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ChannelHandler.Sharable; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.bytes.ByteArrayDecoder; import io.netty.handler.codec.bytes.ByteArrayEncoder; import io.netty.util.AttributeKey; import io.netty.util.concurrent.DefaultThreadFactory; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.log4j.BasicConfigurator; public class TestTCPNetty { static { BasicConfigurator.configure(); } public static void main(String[] args) throws Exception { //這就是主要的服務啟動器 ServerBootstrap serverBootstrap = new ServerBootstrap(); //=======================下面我們設定執行緒池 //BOSS執行緒池 EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1); //WORK執行緒池:這樣的申明方式,主要是為了向讀者說明Netty的執行緒組是怎樣工作的 ThreadFactory threadFactory = new DefaultThreadFactory("work thread pool"); //CPU個數 int processorsNumber = Runtime.getRuntime().availableProcessors(); EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider()); //指定Netty的Boss執行緒和work執行緒 serverBootstrap.group(bossLoopGroup , workLoogGroup); //如果是以下的申明方式,說明BOSS執行緒和WORK執行緒共享一個執行緒池 //(實際上一般的情況環境下,這種共享執行緒池的方式已經夠了) //serverBootstrap.group(workLoogGroup); //========================下面我們設定我們服務的通道型別 //只能是實現了ServerChannel介面的“伺服器”通道類 serverBootstrap.channel(NioServerSocketChannel.class); //當然也可以這樣建立(那個SelectorProvider是不是感覺很熟悉?) //serverBootstrap.channelFactory(new ChannelFactory<NioServerSocketChannel>() { // @Override // public NioServerSocketChannel newChannel() { // return new NioServerSocketChannel(SelectorProvider.provider()); // } //}); //========================設定處理器 //為了演示,這裡我們設定了一組簡單的ByteArrayDecoder和ByteArrayEncoder //Netty的特色就在這一連串“通道水管”中的“處理器” serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() { /* (non-Javadoc) * @see io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.Channel) */ @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ByteArrayEncoder()); ch.pipeline().addLast(new TCPServerHandler()); ch.pipeline().addLast(new ByteArrayDecoder()); } }); //========================設定netty伺服器繫結的ip和埠 serverBootstrap.option(ChannelOption.SO_BACKLOG, 128); serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true); serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 83)); //還可以監控多個埠 //serverBootstrap.bind(new InetSocketAddress("0.0.0.0", 84)); } } /** * @author yinwenjie */ @Sharable class TCPServerHandler extends ChannelInboundHandlerAdapter { /** * 日誌 */ private static Log LOGGER = LogFactory.getLog(TCPServerHandler.class); /** * 每一個channel,都有獨立的handler、ChannelHandlerContext、ChannelPipeline、Attribute * 所以不需要擔心多個channel中的這些物件相互影響。<br> * 這裡我們使用content這個key,記錄這個handler中已經接收到的客戶端資訊。 */ private static AttributeKey<StringBuffer> content = AttributeKey.valueOf("content"); /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRegistered(io.netty.channel.ChannelHandlerContext) */ @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelRegistered(ctx)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelUnregistered(io.netty.channel.ChannelHandlerContext) */ @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelUnregistered(ctx)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelActive(io.netty.channel.ChannelHandlerContext) */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelActive(ctx) = " + ctx.toString()); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelInactive(io.netty.channel.ChannelHandlerContext) */ @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelInactive(ctx)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelRead(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { TCPServerHandler.LOGGER.info("channelRead(ChannelHandlerContext ctx, Object msg)"); /* * 我們使用IDE工具模擬長連線中的資料緩慢提交。 * 由read方法負責接收資料,但只是進行資料累加,不進行任何處理 * */ ByteBuf byteBuf = (ByteBuf)msg; try { StringBuffer contextBuffer = new StringBuffer(); while(byteBuf.isReadable()) { contextBuffer.append((char)byteBuf.readByte()); } //加入臨時區域 StringBuffer content = ctx.attr(TCPServerHandler.content).get(); if(content == null) { content = new StringBuffer(); ctx.attr(TCPServerHandler.content).set(content); } content.append(contextBuffer); } catch(Exception e) { throw e; } finally { byteBuf.release(); } } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelReadComplete(io.netty.channel.ChannelHandlerContext) */ @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelReadComplete(ChannelHandlerContext ctx)"); /* * 由readComplete方法負責檢查資料是否接收完了。 * 和之前的文章一樣,我們檢查整個內容中是否有“over”關鍵字 * */ StringBuffer content = ctx.attr(TCPServerHandler.content).get(); //如果條件成立說明還沒有接收到完整客戶端資訊 if(content.indexOf("over") == -1) { return; } //當接收到資訊後,首先要做的的是清空原來的歷史資訊 ctx.attr(TCPServerHandler.content).set(new StringBuffer()); //準備向客戶端傳送響應 ByteBuf byteBuf = ctx.alloc().buffer(1024); byteBuf.writeBytes("回發響應資訊!".getBytes()); ctx.writeAndFlush(byteBuf); /* * 關閉,正常終止這個通道上下文,就可以關閉通道了 * (如果不關閉,這個通道的回話將一直存在,只要網路是穩定的,伺服器就可以隨時通過這個回話向客戶端傳送資訊)。 * 關閉通道意味著TCP將正常斷開,其中所有的 * handler、ChannelHandlerContext、ChannelPipeline、Attribute等資訊都將登出 * */ ctx.close(); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, java.lang.Object) */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { TCPServerHandler.LOGGER.info("super.userEventTriggered(ctx, evt)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#channelWritabilityChanged(io.netty.channel.ChannelHandlerContext) */ @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.channelWritabilityChanged(ctx)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelInboundHandlerAdapter#exceptionCaught(io.netty.channel.ChannelHandlerContext, java.lang.Throwable) */ @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { TCPServerHandler.LOGGER.info("super.exceptionCaught(ctx, cause)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelHandlerAdapter#handlerAdded(io.netty.channel.ChannelHandlerContext) */ @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.handlerAdded(ctx)"); } /* (non-Javadoc) * @see io.netty.channel.ChannelHandlerAdapter#handlerRemoved(io.netty.channel.ChannelHandlerContext) */ @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { TCPServerHandler.LOGGER.info("super.handlerRemoved(ctx)"); } }

這是server端的程式碼。就像我在前文中提到的一樣,客戶端是否使用了NIO技術實際上對整個系統架構的效能影響不大。您可以使用任何支援TCP/IP協議技術的程式碼,作為客戶端。可以使用Python、C++、C#、JAVA等等任意的程式語言。

如果您真的想更簡單一些,看看客戶端的程式碼,好吧,下面是我寫的一個。實際上這段程式碼來自於我的博文《架構設計:系統間通訊(1)——概述從“聊天”開始上篇》中的程式碼示例:

package testBSocket;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.URLEncoder;
import java.util.concurrent.CountDownLatch;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.BasicConfigurator;

/**
 * 一個SocketClientRequestThread執行緒模擬一個客戶端請求。
 * @author yinwenjie
 */
public class SocketClientRequestThread implements Runnable {

    static {
        BasicConfigurator.configure();
    }

    /**
     * 日誌
     */
    private static final Log LOGGER = LogFactory.getLog(SocketClientRequestThread.class);

    private CountDownLatch countDownLatch;

    /**
     * 這個線層的編號
     * @param countDownLatch
     */
    private Integer clientIndex;

    /**
     * countDownLatch是java提供的同步計數器。
     * 當計數器數值減為0時,所有受其影響而等待的執行緒將會被啟用。這樣保證模擬併發請求的真實性
     * @param countDownLatch
     */
    public SocketClientRequestThread(CountDownLatch countDownLatch , Integer clientIndex) {
        this.countDownLatch = countDownLatch;
        this.clientIndex = clientIndex;
    }

    @Override
    public void run() {
        Socket socket = null;
        OutputStream clientRequest = null;
        InputStream clientResponse = null;

        try {
            socket = new Socket("localhost",83);
            clientRequest = socket.getOutputStream();
            clientResponse = socket.getInputStream();

            //等待,直到SocketClientDaemon完成所有執行緒的啟動,然後所有執行緒一起傳送請求
            this.countDownLatch.await();

            //傳送請求資訊
            clientRequest.write(URLEncoder.encode("這是第" + this.clientIndex + " 個客戶端的請求11。", "UTF-8").getBytes());
            clientRequest.flush();
            clientRequest.write(URLEncoder.encode("這是第" + this.clientIndex + " 個客戶端的請求22。over","UTF-8").getBytes());

            //在這裡等待,直到伺服器返回資訊
            SocketClientRequestThread.LOGGER.info("第" + this.clientIndex + "個客戶端的請求傳送完成,等待伺服器返回資訊");
            int maxLen = 1024;
            byte[] contextBytes = new byte[maxLen];
            int realLen;
            String message = "";
            //程式執行到這裡,會一直等待伺服器返回資訊(注意,前提是in和out都不能close,如果close了就收不到伺服器的反饋了)
            while((realLen = clientResponse.read(contextBytes, 0, maxLen)) != -1) {
                message += new String(contextBytes , 0 , realLen);
            }
            SocketClientRequestThread.LOGGER.info("接收到來自伺服器的資訊:" + message);
        } catch (Exception e) {
            SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
        } finally {
            try {
                if(clientRequest != null) {
                    clientRequest.close();
                }
                if(clientResponse != null) {
                    clientResponse.close();
                }
            } catch (IOException e) {
                SocketClientRequestThread.LOGGER.error(e.getMessage(), e);
            }
        }
    }
}

雖然示例程式碼中已經有比較詳細的註釋說明,但是為了讓您更清楚伺服器程式碼的含義,下面一個小結我們針對程式碼中重要的內容進行講解。

2-2、程式碼片段講解

//BOSS執行緒池
EventLoopGroup bossLoopGroup = new NioEventLoopGroup(1);

BOSS執行緒池實際上就是JAVA NIO框架中selector工作角色(這個後文會詳細講),針對一個本地IP的埠,BOSS執行緒池中有一條執行緒工作,工作內容也相對簡單,就是發現新的連線;Netty是支援同時監聽多個埠的,所以BOSS執行緒池的大小按照需要監聽的伺服器埠數量進行設定就行了。

//Work執行緒池
int processorsNumber = Runtime.getRuntime().availableProcessors();
EventLoopGroup workLoogGroup = new NioEventLoopGroup(processorsNumber * 2, threadFactory, SelectorProvider.provider());

這段程式碼主要是確定Netty中工作執行緒池的大小,這個大小一般是物理機器/虛擬機器器 可用核心的個數 * 2。work執行緒池中的執行緒(如果封裝的是JAVA NIO,那麼具體的執行緒實現類就是NioEventLoop)都固定負責指派給它的網路連線的事件監聽,並根據事件狀態,呼叫不同的ChannelHandler事件方法。而最後一個引數SelectorProvider說明了這個EventLoop所使用的多路複用IO模型為作業系統決定

serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);

option方法,可以設定這個ServerChannel相應的各種屬性(在程式碼中我們使用的是NioServerSocketChannel);childOption方法用於設定這個ServerChannel收到客戶端時間後,所生成的新的Channel的各種屬性(程式碼中,我們生成的是NioSocketChannel)。詳細的option引數可以參見ChannelOption類中的註釋說明。

3、重要概念

3-1、Netty執行緒機制

還記的我們在講解JAVA NIO框架對 多路複用IO技術 的支援時,講到的Selector選擇器嗎?它大致的工作方式是:

while(true) {
    if(selector.select(100) == 0) {
        //================================================
        //      這裡視業務情況,可以做一些然並卵的事情
        //================================================
        continue;
    }

    Iterator<SelectionKey> selecionKeys = selector.selectedKeys().iterator();

    while(selecionKeys.hasNext()) {
        SelectionKey readyKey = selecionKeys.next();
        selecionKeys.remove();

        SelectableChannel selectableChannel = readyKey.channel();
        if(readyKey.isValid() && readyKey.isAcceptable()) {
            。。。
        } else if(readyKey.isValid()&&readyKey.isConnectable()) {
            。。。
        } else if(readyKey.isValid()&&readyKey.isReadable()) {
            。。。
        }
    }
}

在前文介紹JAVA對多路複用IO技術的支援中,我們說過,Selector可以是在主執行緒上面操作,也可以是一個獨立的執行緒進行操作。在Netty中,這裡的部分工作就是交給BOSS執行緒做的。BOSS執行緒負責發現連線到伺服器的新的channel(SocketServerChannel的ACCEPT事件),並且將這個channel經過檢查後註冊到WORK連線池的某個EventLoop執行緒中

而當WORK執行緒發現作業系統有一個它感興趣的IO事件時(例如SocketChannel的READ事件)則呼叫相應的ChannelHandler事件。當某個channel失效後(例如顯示呼叫ctx.close())這個channel將從繫結的EventLoop中被剔除。

在Netty中,如果我們使用的是一個JAVA NIO框架的封裝,那麼進行這個迴圈的是NioEventLoop類(實現多路複用的支援時)。參見該類中的processSelectedKeysPlain方法 和 processSelectedKey方法。另外在這個類中Netty解決了之前我們說到的java nio中”Selector.select(timeout) CPU 100%” 的BUG和一個“NullPointerException in Selector.open()”(http://bugs.java.com/view_bug.do?bug_id=6427854)的BUG:

processSelectedKeysPlain方法

for (;;) {
    final SelectionKey k = i.next();
    final Object a = k.attachment();
    i.remove();

    if (a instanceof AbstractNioChannel) {
        processSelectedKey(k, (AbstractNioChannel) a);
    } else {
        @SuppressWarnings("unchecked")
        NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
        processSelectedKey(k, task);
    }

    if (!i.hasNext()) {
        break;
    }

    if (needsToSelectAgain) {
        selectAgain();
        selectedKeys = selector.selectedKeys();

        // Create the iterator again to avoid ConcurrentModificationException
        if (selectedKeys.isEmpty()) {
            break;
        } else {
            i = selectedKeys.iterator();
        }
    }
}

processSelectedKey方法:

if (!k.isValid()) {
   // close the channel if the key is not valid anymore
    unsafe.close(unsafe.voidPromise());
    return;
}

try {
    int readyOps = k.readyOps();
    // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
    // to a spin loop
    if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
        unsafe.read();
        if (!ch.isOpen()) {
            // Connection already closed - no need to handle write.
            return;
        }
    }
    if ((readyOps & SelectionKey.OP_WRITE) != 0) {
        // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
        ch.unsafe().forceFlush();
    }
    if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
        // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
        // See https://github.com/netty/netty/issues/924
        int ops = k.interestOps();
        ops &= ~SelectionKey.OP_CONNECT;
        k.interestOps(ops);

        unsafe.finishConnect();
    }
} catch (CancelledKeyException ignored) {
    unsafe.close(unsafe.voidPromise());
}

一個Work執行緒池的執行緒將按照底層JAVA NIO的Selector的事件狀態,決定執行ChannelHandler中的哪一個事件方法(Netty中的事件,包括了channelRegistered、channelUnregistered、channelActive、channelInactive等事件方法)。執行完成後,work執行緒將一直輪詢直到作業系統回覆下一個它所管理的channel發生了新的IO事件

3-2、ByteBuf

Netty uses its own buffer API instead of NIO ByteBuffer to represent a sequence of bytes. This approach has significant advantages over using ByteBuffer. Netty’s new buffer type, ChannelBuffer has been designed from the ground up to address the problems of ByteBuffer and to meet the daily needs of network application developers. To list a few cool features:

  • You can define your own buffer type if necessary.
  • Transparent zero copy is achieved by a built-in composite buffer type.
  • A dynamic buffer type is provided out-of-the-box, whose capacity is expanded on demand, just like StringBuffer.
  • There’s no need to call flip() anymore.
  • It is often faster than ByteBuffer.

上面的引用來自於JBOSS-Netty官方文件中,對ByteBuf快取的解釋。翻譯成中文就是:Netty重寫了JAVA NIO框架中的快取結構,並將這個結構應用在更上層的封裝中。

為什麼要重寫呢?JBOSS-Netty給出的解釋是:我寫的快取比JAVA中的ByteBuffer牛。好吧,作為一個屌絲IT從業人員,我想我不能說什麼了。

這裡寫圖片描述

這裡說一說Netty中幾個比較特別的ByteBuf實現:

  • io.netty.buffer.EmptyByteBuf:這是一個初始容量和最大容量都為0的快取區。一般我們用這種快取區描述“沒有任何處理結果”,並將其向下一個handler傳遞。

  • io.netty.buffer.ReadOnlyByteBuf:這是一個不允許任何“寫請求”的只讀快取區。一般是通過Unpooled.unmodifiableBuffer(ByteBuf)方法將某一個正常可讀寫的快取區轉變而成。如果我們需要在下一個Handler處理的過程中禁止寫入任何資料到快取區,就可以在這個handler中進行“只讀快取區”的轉換。

  • io.netty.buffer.UnpooledDirectByteBuf:基本的JAVA NIO框架的ByteBuffer封裝。一般我們直接使用這個快取區實現來處理Handler事件。

  • io.netty.buffer.PooledByteBuf:Netty4.X版本的快取新特性,主要是為了減少之前unpoolByteBuf在建立和銷燬時的GC時間。

3-3、Channel

Channel,通道。您可以使用JAVA NIO中的Channel去初次理解它,但實際上它的意義和JAVA NIO中的通道意義還不一樣。我們可以理解成:“更抽象、更豐富”。如下如所示:

這裡寫圖片描述

  • Netty中的Channel專門代表網路通訊,這個和JAVA NIO框架中的Channel不一樣,後者中還有類似FileChannel本地檔案IO通道。由於前者專門代表網路通訊,所以它是由客戶端地址 + 伺服器地址 + 網路操作狀態構成的,請參見io.netty.channel.Channel介面的定義。

  • 每一個Netty中的Channel,比JAVA NIO中的Channel更抽象。這是為什麼呢?在Netty中,不止封裝了JAVA NIO的IO模型,還封裝了JAVA BIO的阻塞同步IO通訊模型。將他們在表現上都抽象成Channel了。這就是為什麼Netty中有io.netty.channel.oio.AbstractOioChannel這樣的抽象類。

  • 其io.netty.channel.oio.AbstractOioChannel抽象類上的註解也說明得比較清楚:

Abstract base class for Channel implementations that use Old-Blocking-IO

  • 您可以這樣理解:Netty的Channel更具業務抽象性。

3-4、ChannelPipeline和ChannelHandler

  • Netty中的每一個Channel,都有一個獨立的ChannelPipeline,中文稱為“通道水管”。只不過這個水管是雙向的裡面流淌著資料,資料可以通過這個“水管”流入到伺服器,也可以通過這個“水管”從伺服器流出。

  • 在ChannelPipeline中,有若干的過濾器。我們稱之為“ChannelHandler”(處理器或者過濾器)。同“流入”和“流出”的概念向對應:用於處理/過濾 流入資料的ChannelHandler,稱之為“ChannelInboundHandler”;用於處理/過濾 流出資料的ChannelHandler,稱之為“ChannelOutboundHandler”

如下圖所示:
這裡寫圖片描述

3-4-1、責任鏈和介面卡的應用

  • 資料在ChannelPipeline中有一個一個的Handler進行處理,並形成一個新的資料狀態。這是典型的“責任鏈”模式。

  • 需要注意,雖然資料管道中的Handler是按照順序執行的,但不代表某一個Handler會處理任何一種由“上一個handler”傳送過來的資料。某些Handler會檢查傳來的資料是否符合要求,如果不符合自己的處理要求,則不進行處理。

  • 我們可以實現ChannelInboundHandler介面或者ChannelOutboundHandler介面,來實現我們自己業務的“資料流入處理器”或者“資料流出”處理器。

  • 但是這兩個介面的事件方法是比較多的,例如ChannelInboundHandler介面一共有11個需要實現的介面方法(包括父級ChannelHandler的,我們在下一節講解Channel的生命週期時,回專門講到這些事件的執行順序和執行狀態),一般情況下我們不需要把這些方法全部實現

  • 所以Netty中增加了兩個介面卡“ChannelInboundHandlerAdapter”和“ChannelOutboundHandlerAdapter”來幫助我們去實現我們只需要實現的事件方法。其他的事件方法我們就不需要關心了:

這裡寫圖片描述

  • 在我上文給出的示例程式碼中,書寫的業務處理器TCPServerHandler就是繼承了ChannelInboundHandlerAdapter介面卡。下面,我們將介紹幾個常使用的ChannelInboundHandler處理器和ChannelOutboundHandler處理器

3-4-2、ChannelInboundHandler類舉例

  • HttpRequestDecoder:實現了Http協議的資料輸入格式的解析。這個類將資料編碼為HttpMessage物件,並交由下一個ChannelHandler進行處理。

  • ByteArrayDecoder:最基礎的資料流輸入處理器,將所有的byte轉換為ByteBuf物件(一般的實現類是:io.netty.buffer.UnpooledUnsafeDirectByteBuf)。我們進行一般的文字格式資訊傳輸到伺服器時,最好使用這個Handler將byte陣列轉換為ByteBuf物件。

  • DelimiterBasedFrameDecoder:這個資料流輸入處理器,會按照外部傳入的資料中給定的某個關鍵字元/關鍵字串,重新將資料組裝為新的段落併發送給下一個Handler處理器。後文中,我們將使用這個處理器進行TCP半包的問題。

  • 還有很多直接支援標準資料格式解析的處理器,例如支援Google Protocol Buffers 資料格式解析的ProtobufDecoder和ProtobufVarint32FrameDecoder處理器。

3-4-3、ChannelOutboundHandler類舉例

  • HttpResponseEncoder:這個類和HttpRequestDecoder相對應,是將伺服器端HttpReponse物件的描述轉換成ByteBuf物件形式,並向外傳播。

  • ByteArrayEncoder:這個類和ByteArrayDecoder,是將伺服器端的ByteBuf物件轉換成byte陣列的形式,並向外傳播。一般也和ByteArrayDecoder物件成對使用。

  • 還有支援標準的編碼成Google Protocol Buffers格式、JBoss Marshalling 格式、ZIP壓縮格式的ProtobufEncoder、ProtobufVarint32LengthFieldPrepender、MarshallingEncoder、JZlibEncoder

4、Channel的生命週期

上面第3小節,講到了Netty中的重要概念。我們花很大篇幅講解了Channel、ChannelPipeline、ChannelHandler,以及他們的聯絡和工作方式。

在說到ChannelInHandler為什麼會使用“介面卡”模式的時候,特別指出了原因:因為ChannelInHandler介面中的方法加上父級介面中的方法,總共有11個介面事件方法需要實現。而事實上很多時候我們只會關心其中的一個或者兩個介面方法。

那麼這些方法是什麼時候被觸發的呢?這就要說到Netty中一個Channel的生命週期了(這裡我們考慮的生命週期是指Netty對JAVA NIO技術框架的封裝):

這裡寫圖片描述

這裡有一個channel事件沒有在圖中說明,就是exceptionCaught(ChannelHandlerContext, Throwable)事件。只要在呼叫圖中的所有事件方法時,有異常丟擲,exceptionCaught方法就會被呼叫。

另外,不是channelReadComplete(ChannelHandlerContext)方法呼叫後就一定會呼叫channelInactive事件方法。channelReadComplete和channelRead是可以反覆呼叫的,只要客戶端有資料傳送過來。

最後補充一句,這個生命週期的事件方法呼叫順序只是針對Netty封裝使用JAVA NIO框架時,並且在進行TCP/IP協議監聽時的事件方法呼叫順序

5、再次審視為什麼使用Netty

下一篇文章,我們將回歸最初的這個問題。重新總結Netty面向業務的框架和JAVA NIO面向技術的框架的區別。並使用Netty程式碼解決IO網路通訊中的實際問題。

相關推薦

架構設計系統通訊6——IO通訊模型Netty

1、Netty介紹 在Netty官網上,對於Netty的介紹是: Netty is a NIO client server framework which enables quick and easy development of network ap

架構設計系統存儲28——分布式文件系統Ceph掛載

all 兩個文件 原因 之前 來看 大數據 details 失敗 variable (接上文《架構設計:系統存儲(27)——分布式文件系統Ceph(安裝)》) 3. 連接到Ceph系統 3-1. 連接客戶端 完畢Ceph文件系統的創建過程後。就

架構設計系統通訊36——Apache Camel快速入門

架構設計:系統間通訊(36)——Apache Camel快速入門(上) :http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/51692340 1、本專題主

架構設計系統通訊34——被神化的ESB

1、概述 從本篇文章開始,我們將花一到兩篇的篇幅介紹ESB(企業服務匯流排)技術的基本概念,為讀者們理清多個和ESB技術有關名詞。我們還將在其中為讀者闡述什麼情況下應該使用ESB技術。接下來,為了加深讀者對ESB技術的直觀理解,我們將利用Apache Came

架構設計系統通訊16——服務治理與Dubbo 中篇預熱

1、前序 上篇文章中(《架構設計:系統間通訊(15)——服務治理與Dubbo 上篇》),我們以示例的方式講解了阿里DUBBO服務治理框架基本使用。從這節開始我們將對DUBBO的主要模組的設計原理進行講解,從而幫助讀者理解DUBBO是如何工作的。(由於這個章節的內容比較多,包括了知識準備、DUBBO框架概述

架構設計系統通訊23——提高ActiveMQ工作效能

6、ActiveMQ處理規則和優化 在ActiveMQ單個服務節點的優化中,除了對ActiveMQ單個服務節點的網路IO模型進行優化外,生產者傳送訊息的策略和消費者處理訊息的策略也關乎整個訊息佇列系統是否能夠高效工作。請看下圖所示的訊息生產者和訊息消費

架構設計系統通訊15——服務治理與Dubbo

1、上篇中“自定義服務治理框架”的問題 在之前的文章中(《架構設計:系統間通訊(13)——RPC例項Apache Thrift 下篇(1)》、《架構設計:系統間通訊(14)——RPC例項Apache Thrift 下篇(2)》),我們基於服務治理的基本原理,自

架構設計系統通訊21——ActiveMQ的安裝與使用

1、前言 之前我們通過兩篇文章(架構設計:系統間通訊(19)——MQ:訊息協議(上)、架構設計:系統間通訊(20)——MQ:訊息協議(下))從理論層面上為大家介紹了訊息協議的基本定義,並花了較大篇幅向讀者介紹了三種典型的訊息協議:XMPP協議、Stomp協議和

架構設計系統通訊10——RPC的基本概念

1、概述 經過了詳細的資訊格式、網路IO模型的講解,並且通過JAVA RMI的講解進行了預熱。從這篇文章開始我們將進入這個系列博文的另一個重點知識體系的講解:RPC。在後續的幾篇文章中,我們首先講解RPC的基本概念,一個具體的RPC實現會有哪些基本要素構成,然

架構設計系統通訊40——自己動手設計ESB1

1、概述 在我開始構思這幾篇關於“自己動手設計ESB中介軟體”的文章時,曾有好幾次動過放棄的念頭。原因倒不是因為對冗長的文章產生了惰性,而是ESB中所涉及到的技術知識和需要突破的設計難點實在是比較多,再冗長的幾篇博文甚至無法對它們全部進行概述,另外如果在思路上

架構設計系統通訊39——Apache Camel快速入門下2

4-2-1、LifecycleStrategy LifecycleStrategy介面按照字面的理解是一個關於Camel中元素生命週期的規則管理器,但實際上LifecycleStrategy介面的定義更確切的應該被描述成一個監聽器: 當Camel

架構設計系統通訊24——提高ActiveMQ工作效能

7、ActiveMQ的持久訊息儲存方案 前文已經講過,當ActiveMQ接收到PERSISTENT Message訊息後就需要藉助持久化方案來完成PERSISTENT Message的儲存。這個介質可以是磁碟檔案系統、可以是ActiveMQ的內建資料庫

架構設計系統通訊37——Apache Camel快速入門

(補上文:Endpoint重要的漏講內容) 3-1-2、特殊的Endpoint Direct Endpoint Direct用於在兩個編排好的路由間實現Exchange訊息的連線,上一個路由中由最後一個元素處理完的Exchange物件,將被髮送至由D

架構設計系統通訊28——Kafka及場景應用中1

在本月初的寫作計劃中,我本來只打算粗略介紹一下Kafka(同樣是因為進度原因)。但是,最近有很多朋友要求我詳細講講Kafka的設計和使用,另外兩年前我在研究Kafka準備將其應用到生產環境時,由於沒有仔細理解Kafka的設計結構所導致的問題最後也還沒有進行交

架構設計系統通訊26——ActiveMQ叢集方案

3、ActiveMQ熱備方案 ActiveMQ熱備方案,主要保證ActiveMQ的高可用性。這種方案並不像上節中我們主要討論的ActiveMQ高效能方案那樣,同時有多個節點都處於工作狀態,也就是說這種方案並不提高ActiveMQ叢集的效能;而是從叢集中的多

架構設計系統通訊38——Apache Camel快速入門下1

3-5-2-3迴圈動態路由 Dynamic Router 動態迴圈路由的特點是開發人員可以通過條件表示式等方式,動態決定下一個路由位置。在下一路由位置處理完成後Exchange將被重新返回到路由判斷點,並由動態迴圈路由再次做出新路徑的判斷。如此迴圈執行

架構設計系統通訊2——概述從“聊天”開始下篇

【轉】https://blog.csdn.net/yinwenjie/article/details/48344989 4-3、NIO通訊框架 目前流行的NIO框架非常的多。在論壇上、網際網路上大家討論和使用最多的有以下幾種: 原生JAVA NIO框架:

架構設計系統通訊——ActiveMQ叢集方案

1、綜述 通過之前的文章,我們討論了ActiveMQ的基本使用,包括單個ActiveMQ服務節點的效能特徵,關鍵調整引數;我們還介紹了單個ActiveMQ節點上三種不同的持久化儲存方案,並討論了這三種不同的持久化儲存方案的配置和效能特點。但是這還遠遠不夠,因為在生產環境

架構設計系統通訊——MQ訊息協議

1、概述從本文開始,我們介紹另一型別的系統間通訊及輸:MQ訊息佇列。首先我們將討論幾種常用訊息佇列協議的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然後在這個基礎上介紹兩款MQ產品:ActiveMQ和RabbitMQ,它們是現在業務系

架構設計系統儲存18——Redis叢集方案高效能

1、概述 通過上一篇文章(《架構設計:系統儲存(17)——Redis叢集方案:高可用》)的內容,Redis主從複製的基本功能和進行Redis高可用叢集監控的Sentinel基本功能基本呈現給了讀者。雖然本人並不清楚上一篇根據筆者實際工作經驗所撰寫的文章有什麼重