1. 程式人生 > >Netty(EventLoop 和執行緒模型)

Netty(EventLoop 和執行緒模型)

EventLoop介面

    Netty的EventLoop是協同設計的一部分,它採用了兩個基本的API:併發和網路程式設計。首先,io.netty.util.concurrent包構建在JDK的java.util.concurrent包上,用來提供執行緒執行器。其次,io.netty.channel包中的類,為了與Channel的事件進行互動,擴充套件了這些介面/類。

    在這個模型中,一個EventLoop將由一個永遠都不會改變的Thread驅動,  同時任務(Runnable或者Callable)可以直接提交給EventLoop實現,  以立即執行或者排程執行。根據配置和可用核心的不同,可能會建立多個EventLoop例項用以優化資源的使用,並且單個EventLoop可能會被指派用於服務多個Channel。 

    需要注意的是,Netty的EventLoop在繼承了ScheduledExecutorService的同時,只定義了一個方法,parent()。這個方法,如下面的程式碼片斷所示,用於返回到當前EventLoop實現的例項所屬的EventLoopGroup的引用。

public interface EventLoop extends OrderedEventExecutor, EventLoopGroup {
    @Override
    EventLoopGroup parent();
}

事件/任務的執行順序  事件和任務是以先進先出(FIFO)的順序執行的。這樣可以通過保證位元組內容總是按正確的順序被處理,消除潛在的資料損壞的可能性。

    通過原始碼分析可以看到所有的任務都會在這個方法中執行,而NioEventLoop的run方法最外層套了一個for (;;),它會判斷是否有任務提交,然後不斷迴圈該內部程式碼。

Netty 4 中的 I/O 和事件處理

    由I/O操作觸發的事件將流經安裝了一個或者多個ChannelHandler的ChannelPipeline。傳播這些事件的方法呼叫可以隨後被Channel-Handler所攔截,並且可以按需地處理事件。在Netty4中,所有的I/O操作和事件都由已經被分配給了EventLoop的那個Thread來處理。

Netty 3 中的 I/O操作

    在以前的版本中所使用的執行緒模型只保證了入站(之前稱為上游)事件會在所謂的I/O執行緒(對應於Netty4中的EventLoop)中執行。所有的出站(下游)事件都由呼叫執行緒處理,其可能是I/O執行緒也可能是別的執行緒。但是其需要在ChannelHandler中對出站事件進行仔細的同步。 簡而言之,不可能保證多個執行緒不會在同一時刻嘗試訪問出站事件。

任務排程

JDK  的任務排程API

    在Java5之前,任務排程是建立在java.util.Timer類之上的,其使用了一個後臺Thread,並且具有 與標準執行緒相同的限制。隨後,JDK提供了java.util.concurrent包,它定義了interface ScheduledExecutorService。

java.util.concurrent.Executors類的工廠方法:

方法 描述

newScheduledThreadPool(int corePoolSize)

newScheduledThreadPool(int corePoolSize,ThreadFactory threadFactory)

建立一個ScheduledThreadExecutorService,用於排程命令在指定延遲之後執行或者週期性地執行。它使用corePoolSize引數來計算執行緒數

newSingleThreadScheduledExecutor()

newSingleThreadScheduledExecutor(ThreadFactory threadFactory)

建立一個ScheduledThreadExecutorService,用於排程命令在指定延遲之後執行或者週期性地執行。它使用一個執行緒來執行被排程的任務

使用 EventLoop排程任務

    ScheduledExecutorService的實現具有侷限性,例如,事實上作為執行緒池管理的一部分,將會有額外的執行緒建立。如果有大量任務被緊湊地排程,那麼這將 成為一個瓶頸。Netty通過Channel的EventLoop實現任務排程解決了這一問題:

 ctx.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            System.out.println("開啟非同步執行緒");
                        }
                    },1,TimeUnit.SECONDS);//排程任務在從現在開始的1秒之後執行

如果要開啟一個週期性的任務,如心跳檢測:

   ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            ctx.writeAndFlush(Unpooled.copiedBuffer("傳送心跳"+new Date(), Charset.forName("UTF-8")));
                        }
                    },1,5, TimeUnit.SECONDS);//排程在1秒之後,並且以後每間隔5秒執行

如果想要取消這個任務可以這麼寫:

  ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            System.out.println("開啟非同步執行緒");
                        }
                    },1,TimeUnit.SECONDS);
                    future.cancel(false);

通過呼叫返回值future的cancel方法進行取消任務。

實現細節

執行緒管理 

    Netty執行緒模型的卓越效能取決於對於當前執行的Thread的身份的確定,也就是說,確定它是否是分配給當前Channel以及它的EventLoop的那一個執行緒。(EventLoop將負責處理一個Channel的整個生命週期內的所有事件。)

    如果(當前)呼叫執行緒正是支撐EventLoop的執行緒,那麼所提交的程式碼塊將會被 (直接 )執行。否則,EventLoop將排程該任務以便稍後執行,並將它放入到內部佇列中。當EventLoop下次處理它的事件時,它會執行佇列中的那些任務/事件。這也就解釋了任何的Thread是如何與Channel直接互動而無需在ChannelHandler中進行額外同步的。

    每個EventLoop都有它自已的任務佇列,獨立於任何其他的EventLoop。

永遠不要將一個長時間執行的任務放入到執行佇列中,因為它將阻塞需要在同一執行緒上執行的任何其他任務。

如果必須要進行阻塞呼叫或者執行長時間執行的任務,建議使用一個專門的EventExecutor。

ChannelHandler的執行和阻塞

    通常ChannelPipeline中的每一個ChannelHandler都是通過它的EventLoop(I/O 執行緒)來處理傳遞給它的事件的。所以至關重要的是不要阻塞這個執行緒,因為這會對整體的I/O 處理產生負面的影響。但有時可能需要與那些使用阻塞API  的遺留程式碼進行互動。對於這種情況,ChannelPipeline有一些接受一個EventExecutorGroup的add()方法。如果一個事件被傳遞給一個自定義的EventExecutorGroup,它將被包含在這個EventExecutorGroup中的某個EventExecutor所處理,從而被從該Channel本身的EventLoop中移除。對於這種用例,Netty提供了一個叫DefaultEventExecutorGroup的預設實現。

EventLoop/執行緒的分配

    服務於Channel的I/O和事件的EventLoop包含在EventLoopGroup中。根據不同的傳輸實現,EventLoop的建立和分配方式也不同。

非同步傳輸

    非同步傳輸實現只使用了少量的EventLoop(以及和它們相關聯的Thread), 而且在當前的執行緒模型中,它們可能會被多個Channel所共享。這使得可以通過儘可能少量的Thread來支撐大量的Channel,而不是每個Channel分配一個Thread。

    EventLoopGroup負責為每個新建立的Channel分配一個EventLoop。在當前實現中,使用順序迴圈(round-robin)的方式進行分配以獲取一個均衡的分佈,並且相同的EventLoop可能會被分配給多個Channel。(這一點在將來的版本中可能會改變。)

    一旦一個Channel被分配給一個EventLoop,它將在它的整個生命週期中都使用這個EventLoop(以及相關聯的Thread) 。

    需要注意的是,EventLoop的分配方式對ThreadLocal的使用的影響。因為一個EventLoop通常會被用於支撐多個Channel,所以對於所有相關聯的Channel來說,ThreadLocal都將是一樣的。這使得它對於實現狀態追蹤等功能來說是個糟糕的選擇。然而,在一些無狀態的上下文中,它仍然可以被用於在多個Channel之間共享一些重度的或者代價昂貴的物件,甚至是事件。

阻塞傳輸

用於像OIO(  舊的阻塞I/O)這樣的其他傳輸的設計略有不同。這裡每一個Channel都將被分配給一個EventLoop(以及它的Thread)。

    但是,正如同之前一樣,得到的保證是每個Channel的I/O事件都將只會被一個Thread(用於支撐該Channel的EventLoop的那個Thread)處理。

參考《Netty實戰》

附:

package netty.in.action;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.nio.charset.Charset;

public class EchoServer {

    final ByteBuf bufs= Unpooled.copiedBuffer("Hello,劉德華", Charset.forName("UTF-8"));
    public void bind(int port) throws Exception {
        //配置服務端的NIO執行緒組
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 100)
                    .childHandler(new ChildChannelHandler());

            // 繫結埠,同步等待成功
            ChannelFuture f=b.bind(port).sync();

            //等待服務端監聽埠關閉
            f.channel().closeFuture().sync();
        } finally {
            //退出,釋放執行緒池資源
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("服務端啟動……");
            ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                @Override
                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    if(buf.hasArray()){
                        byte[] array=buf.array();//返回該緩衝區的備份位元組陣列。
                        int offset=buf.arrayOffset()+buf.readerIndex();//計算第一個位元組的偏移量
                        int length=buf.readableBytes();//獲取可讀位元組數
                        String s=new String(array,offset,length);
                        System.out.println("s="+s);
                    }else{
                        byte[] array = new byte[buf.readableBytes()];//獲取可讀位元組數並分配一個新的陣列來儲存
                        buf.getBytes(buf.readerIndex(),array);//將位元組複製到該陣列
                        String s=new String(array,0,buf.readableBytes());
                        System.out.println("直接緩衝區:"+s);
                    }
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    bufs.retain();//引用計數器加一
                    ChannelFuture future=ctx.writeAndFlush(bufs);
                    future.addListener(new ChannelFutureListener() {
                        @Override
                        public void operationComplete(ChannelFuture future) throws Exception {
                            if (future.isSuccess())
                                System.out.println("成功");
                            else{
                                System.out.println("失敗");
                                future.cause().printStackTrace();
                                future.channel().close();
                            }
                        }
                    });
//                    ctx.close();
                }
            });
        }
    }

    public static void main(String[] args) throws Exception {
        int port=8080;
        new EchoServer().bind(port);
    }
}

package netty.in.action;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufProcessor;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.ByteProcessor;
import io.netty.util.concurrent.ScheduledFuture;

import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.concurrent.TimeUnit;

public class EchoClient {
    final ByteBuf buf= Unpooled.copiedBuffer("Hello,王寶強", Charset.forName("UTF-8"));
    public void connect(int port, String host) throws Exception {
        // 配置客戶端NIO執行緒組
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChildChannelHandler2() );
            // 發起非同步連線操作
            ChannelFuture f = b.connect(host, port).sync();
            // 等待客戶端鏈路關閉
            f.channel().closeFuture().sync();
        } finally {
            // 優雅退出,釋放NIO執行緒組
            group.shutdownGracefully();
        }
    }

    private class ChildChannelHandler2 extends ChannelInitializer<SocketChannel> {

        @Override
        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("客戶端啟動……");
            ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ctx.writeAndFlush(buf);
                }
                public void channelRead(ChannelHandlerContext ctx, Object msg) throws UnsupportedEncodingException {
                    ByteBuf buf = (ByteBuf) msg;
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    ScheduledFuture<?> future=ctx.channel().eventLoop().schedule(new Runnable() {
                        @Override
                        public void run() {
                            System.out.println("開啟非同步執行緒");
                        }
                    },1,TimeUnit.SECONDS);
//                    future.cancel(false);
                    ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
                        @Override
                        public void run() {
                            ctx.writeAndFlush(Unpooled.copiedBuffer("傳送心跳"+new Date(), Charset.forName("UTF-8")));
                        }
                    },1,5, TimeUnit.SECONDS);
                }
            });
        }
    }

    private class ChildChannelHandler extends ChannelInitializer<SocketChannel> {

        protected void initChannel(SocketChannel ch) throws Exception {
            System.out.println("客戶端啟動……");
            ByteBuf bufs= Unpooled.copiedBuffer("pipeline傳送的資料->", Charset.forName("UTF-8"));
            ch.pipeline().write(bufs);//通過呼叫ChannelPipeline的write方法將資料寫入通道,但是不重新整理
            ch.pipeline().addLast("text",new ChannelInboundHandlerAdapter() {
                @Override
                public void channelActive(ChannelHandlerContext ctx) throws Exception {
                    ctx.channel().write(Unpooled.copiedBuffer("通過ChannelHandlerContext獲取的channel傳送的訊息->",
                            Charset.forName("UTF-8")));//通過ChannelHandlerContext獲取的channel傳送的訊息->
                    CompositeByteBuf messageBuf=Unpooled.compositeBuffer();
                    ByteBuf headerBuf=buf;
                    ByteBuf bodyBuf=buf;
                    messageBuf.addComponent(bodyBuf);//將ByteBuf例項追加到CompositeByteBuf
                    messageBuf.addComponent(headerBuf);
                    for (ByteBuf buf:messageBuf){//遍歷所有ByteBuf
                        System.out.println(buf);
                        byte[] req = new byte[buf.readableBytes()];
                        buf.readBytes(req);
                        String body = new String(req, "UTF-8");
                        System.out.println("複合緩衝區:"+body);
                    }
                    ctx.writeAndFlush(buf);
                }

                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    ByteBuf copyBuf=((ByteBuf) msg).copy();
//                    System.out.println(buf.refCnt());//返回此物件的引用計數。如果為0,則表示此物件已被釋放。
//                    buf.release();//釋放引用計數物件
                    for (int i = 0; i < buf.capacity(); i++) {
                        byte b=buf.getByte(i);
                        if((char)b>='a'&&(char)b<='z'||(char)b>='A'&&(char)b<='Z'||(char)b==',')
                        System.out.println("i="+(char)b);
                    }
                    int i=buf.forEachByte(new ByteProcessor() {
                        @Override
                        public boolean process(byte value) throws Exception {
                            byte[] b=",".getBytes();
                            if (b[0]!=value)
                                return true;
                            else
                                return false;
                        }
                    });
                    System.out.println("i="+i+" value="+(char) buf.getByte(i));
                    ByteBuf sliced = buf.slice(0,2);
                    sliced.setByte(0,(byte)'h');
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println(body);
                    ctx.fireChannelRead(copyBuf);
                }
            });
            ch.pipeline().addLast("text2",new ChannelInboundHandlerAdapter(){
                public void channelRead(ChannelHandlerContext ctx, Object msg)
                        throws Exception {
                    ByteBuf buf = (ByteBuf) msg;
                    byte[] req = new byte[buf.readableBytes()];
                    buf.readBytes(req);
                    String body = new String(req, "UTF-8");
                    System.out.println("text2:"+body);
                    ByteBuf bufs= Unpooled.copiedBuffer("test2傳送的資料", Charset.forName("UTF-8"));
                    ctx.writeAndFlush(bufs);

                    ctx.close();
                }
            });
//            ch.pipeline().remove("text2");
        }
    }

    public static void main(String[] args) throws Exception {
        int port = 8080;
        new EchoClient().connect(port, "127.0.0.1");
    }
}