1. 程式人生 > >【1】netty4服務端啟動原始碼分析-執行緒的建立

【1】netty4服務端啟動原始碼分析-執行緒的建立

轉自 http://xw-z1985.iteye.com/blog/1925013

本文分析Netty中boss和worker的執行緒的建立過程:

以下程式碼是服務端的啟動程式碼,執行緒的建立就發生在其中。

EventLoopGroup bossGroup = new NioEventLoopGroup();

NioEventLoopGroup的類關係圖如下:

在這裡插入圖片描述

構造方法執行過程如下:

// NioEventLoopGroup  
public NioEventLoopGroup() {  
        this(0);  
    }  
  
public NioEventLoopGroup(int nThreads) {  
        this(nThreads, null);  
    }  
  
public NioEventLoopGroup(int nThreads, ThreadFactory threadFactory) {  
        this(nThreads, threadFactory, SelectorProvider.provider());  
    }  
  
public NioEventLoopGroup(  
            int nThreads, ThreadFactory threadFactory, final SelectorProvider selectorProvider) {  
        super(nThreads, threadFactory, selectorProvider);  
    }  

看下父類MultithreadEventLoopGroup的構造方法

// MultithreadEventLoopGroup  
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
        super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);  
    }  

注:如果沒有指定建立的執行緒數量,則預設建立的執行緒個數為DEFAULT_EVENT_LOOP_THREADS,該數值為:處理器數量x2

再來看MultithreadEventLoopGroup的父類MultithreadEventExecutorGroup的構造方法

/** 
    * Create a new instance. 
    * 
    * @param nThreads          the number of threads that will be used by this instance. 
    * @param threadFactory     the ThreadFactory to use, or {@code null} if the default should be used. 
    * @param args              arguments which will passed to each {@link #newChild(ThreadFactory, Object...)} call 
    */  
   protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {  
       if (nThreads <= 0) {  
           throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));  
       }  
  
       if (threadFactory == null) {  
           threadFactory = newDefaultThreadFactory();  
       }  
  
       children = new SingleThreadEventExecutor[nThreads];  
       for (int i = 0; i < nThreads; i ++) {  
           boolean success = false;  
           try {  
               children[i] = newChild(threadFactory, args);  
               success = true;  
           } catch (Exception e) {  
               // TODO: Think about if this is a good exception type  
               throw new IllegalStateException("failed to create a child event loop", e);  
           } finally {  
               if (!success) {  
                   for (int j = 0; j < i; j ++) {  
                       children[j].shutdownGracefully();  
                   }  
  
                   for (int j = 0; j < i; j ++) {  
                       EventExecutor e = children[j];  
                       try {  
                           while (!e.isTerminated()) {  
                               e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);  
                           }  
                       } catch (InterruptedException interrupted) {  
                           Thread.currentThread().interrupt();  
                           break;  
                       }  
                   }  
               }  
           }  
       }  
  
       final FutureListener<Object> terminationListener = new FutureListener<Object>() {  
           @Override  
           public void operationComplete(Future<Object> future) throws Exception {  
               if (terminatedChildren.incrementAndGet() == children.length) {  
                   terminationFuture.setSuccess(null);  
               }  
           }  
       };  
  
       for (EventExecutor e: children) {  
           e.terminationFuture().addListener(terminationListener);  
       }  
   }  

變數children就是用來存放建立的執行緒的陣列,裡面每一個元素都通過children[i] = newChild(threadFactory, args)建立。而newChild方法則由子類NioEventLoopGroup實現

// NioEventLoopGroup  
  protected EventExecutor newChild(  
            ThreadFactory threadFactory, Object... args) throws Exception {  
        return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);  
    } 

每個元素的真實型別為NioEventLoop,而NioEventLoop的類關係圖如下

在這裡插入圖片描述

(注:其實有點變態的,譬如EventLoop繼承EventLoopGroup,不知道是啥原因,僅僅是因為EventLoop裡有個方法parent(),返回EventLoopGroup這個功能嗎?後續待確認)

接著看NioEventLoop的建構函式:

// NioEventLoop  
 NioEven NioEventLoop tLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {  
        super(parent, threadFactory, false);  
        if (selectorProvider == null) {  
            throw new NullPointerException("selectorProvider");  
        }  
        provider = selectorProvider;  
        selector = openSelector();  
    }  

首先分析一下selector = openSelector()

// NioEventLoop  
 private Selector openSelector() {  
        final Selector selector;  
        try {  
            selector = provider.openSelector();  
        } catch (IOException e) {  
            throw new ChannelException("failed to open a new selector", e);  
        }  
  
        if (DISABLE_KEYSET_OPTIMIZATION) {  
            return selector;  
        }  
  
        try {  
            SelectedSelectionKeySet selectedKeySet = new SelectedSelectionKeySet();  
  
            Class<?> selectorImplClass =  
                    Class.forName("sun.nio.ch.SelectorImpl", false, ClassLoader.getSystemClassLoader());  
            selectorImplClass.isAssignableFrom(selector.getClass());  
            Field selectedKeysField = selectorImplClass.getDeclaredField("selectedKeys");  
            Field publicSelectedKeysField = selectorImplClass.getDeclaredField("publicSelectedKeys");  
  
            selectedKeysField.setAccessible(true);  
            publicSelectedKeysField.setAccessible(true);  
  
            selectedKeysField.set(selector, selectedKeySet);  
            publicSelectedKeysField.set(selector, selectedKeySet);  
  
            selectedKeys = selectedKeySet;  
            logger.trace("Instrumented an optimized java.util.Set into: {}", selector);  
        } catch (Throwable t) {  
            selectedKeys = null;  
            logger.trace("Failed to instrument an optimized java.util.Set into: {}", selector, t);  
        }  
  
        return selector;  
    }  

這裡對sun.nio.ch.SelectorImpl中的selectedKeys和publicSelectedKeys做了優化,NioEventLoop中的變數selectedKeys的型別是SelectedSelectionKeySet,有哪些優化呢?(內部用兩個陣列儲存?初始分配陣列大小置為1024避免頻繁擴容?當大小超過1024時,對陣列進行雙倍擴容?)。

利用反射,當註冊到selector中的selectionKey已準備就緒時,selectedKeys中的元素就不會為空,後面會根據selectedKeys進行分發。

最後分析super(parent, threadFactory, false),即父類SingleThreadEventExecutor的建構函式

/** 
    * Create a new instance 
    * 
    * @param parent            the {@link EventExecutorGroup} which is the parent of this instance and belongs to it 
    * @param threadFactory     the {@link ThreadFactory} which will be used for the used {@link Thread} 
    * @param addTaskWakesUp    {@code true} if and only if invocation of {@link #addTask(Runnable)} will wake up the 
    *                          executor thread 
    */  
   protected SingleThreadEventExecutor(  
           EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {  
  
       if (threadFactory == null) {  
           throw new NullPointerException("threadFactory");  
       }  
  
       this.parent = parent;  
       this.addTaskWakesUp = addTaskWakesUp;  
  
       thread = threadFactory.newThread(new Runnable() {  
           @Override  
           public void run() {  
               boolean success = false;  
               updateLastExecutionTime();  
               try {  
                   SingleThreadEventExecutor.this.run();  
                   success = true;  
               } catch (Throwable t) {  
                   logger.warn("Unexpected exception from an event executor: ", t);  
               } finally {  
                   if (state < ST_SHUTTING_DOWN) {  
                       state = ST_SHUTTING_DOWN;  
                   }  
  
                   // Check if confirmShutdown() was called at the end of the loop.  
                   if (success && gracefulShutdownStartTime == 0) {  
                       logger.error(  
                               "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +  
                               SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +  
                               "before run() implementation terminates.");  
                   }  
  
                   try {  
                       // Run all remaining tasks and shutdown hooks.  
                       for (;;) {  
                           if (confirmShutdown()) {  
                               break;  
                           }  
                       }  
                   } finally {  
                       try {  
                           cleanup();  
                       } finally {  
                           synchronized (stateLock) {  
                               state = ST_TERMINATED;  
                           }  
                           threadLock.release();  
                           if (!taskQueue.isEmpty()) {  
                               logger.warn(  
                                       "An event executor terminated with " +  
                                       "non-empty task queue (" + taskQueue.size() + ')');  
                           }  
  
                           terminationFuture.setSuccess(null);  
                       }  
                   }  
               }  
           }  
       });  
  
       taskQueue = newTaskQueue();  
   }  
  
   /** 
    * Create a new {@link Queue} which will holds the tasks to execute. This default implementation will return a 
    * {@link LinkedBlockingQueue} but if your sub-class of {@link SingleThreadEventExecutor} will not do any blocking 
    * calls on the this {@link Queue} it may make sense to {@code @Override} this and return some more performant 
    * implementation that does not support blocking operations at all. 
    */  
   protected Queue<Runnable> newTaskQueue() {  
       return new LinkedBlockingQueue<Runnable>();  
   }  

boss執行緒就在此處建立:thread = threadFactory.newThread(new Runnable()

同時也建立了執行緒的任務佇列,是一個LinkedBlockingQueue結構。

SingleThreadEventExecutor.this.run()由子類NioEventLoop實現,後面的文章再進行分析

總結:

EventLoopGroup bossGroup = new NioEventLoopGroup()發生了以下事情:

  1、 為NioEventLoopGroup建立數量為:處理器個數 x 2的,型別為NioEventLoop的例項。每個NioEventLoop例項 都持有一個執行緒,以及一個型別為LinkedBlockingQueue的任務佇列

  2、執行緒的執行邏輯由NioEventLoop實現

  3、每個NioEventLoop例項都持有一個selector,並對selector進行優化。