1. 程式人生 > >Jetty原始碼分析之執行緒池:QueuedThreadPool

Jetty原始碼分析之執行緒池:QueuedThreadPool

前面分析Jetty整體架構的時候介紹過Jetty的三大元件:Acceptor、Handler和ThreadPool;前兩者工作的時候都是需要執行緒的,而所需的執行緒正是從ThreadPool中獲取的。這篇檔案就是來分析ThreadPool的一個具體實現:QueuedThreadPool。下面是它的類圖:

這裡寫圖片描述

繼承了父類AbstractLifeCycle之後,QueuedThreadPool就可以當成一個LifeCycle型別的元件管理,這個父類在前面介紹生命週期的時候已經介紹過了,這裡就不重複介紹了。

ThreadPool是一個介面,裡面定義了一些操作和獲取執行緒資訊的方法,它的完整定義如下:

public interface ThreadPool
{
    //將傳入的任務進行分派
    public abstract boolean dispatch(Runnable job);


    /**
     * Blocks until the thread pool is {@link LifeCycle#stop stopped}.
     */
    public void join() throws InterruptedException;

   //返回當前執行緒池中的匯流排程數目
    public int getThreads();

    //返回執行緒池中空閒的執行緒數量
public int getIdleThreads(); /** * @return True if the pool is low on threads */ public boolean isLowOnThreads();

上面的幾個方法定義都很簡單,沒有什麼很難理解的方法。

SizedThreadPool是ThreadPool的一個內部介面,它在ThreadPool的基礎上增加了數量限制,可以認為是一個數量有限的執行緒池,可以指定這個執行緒池中的最小和最大執行緒數。下面是其完整定義,可以看到也很簡單。

public interface
SizedThreadPool extends ThreadPool {
public int getMinThreads();//獲取限制的最小執行緒數量 public int getMaxThreads(); //獲取限制的最大執行緒數量 public void setMinThreads(int threads);//設定最小執行緒數量 public void setMaxThreads(int threads);//設定最大執行緒數量 }

Executor是java.util.concurrent包下的一個介面,介面中只有一個方法如下:

public interface Executor {

    void execute(Runnable command);
}

execute()方法接受一個Runnable型別的物件,然後負責在一個執行緒中執行這個task。至於這個執行緒就是當前呼叫execute()方法的執行緒還是另外分配的一個執行緒,都是由具體實現的子類決定的。這種方式的好處在於呼叫者不用再自己建立執行緒,執行緒的管理完全都有Executor的子類負責。顯然QueuedThreadPool是很適合這種場景的。

上面分析完繼承的父類之後就發現定義都比較簡單,沒有什麼特別難理解的方法或者呼叫關係。所以下面來分析QueuedThreadPool中是如何實現這幾個父類中的方法的。

1) doStart()方法
doStart()方法時在容器啟動的時候就會被呼叫的一個方法,QueuedThreadPool和其它元件一樣在這個方法中進行初始化。

   @Override
    protected void doStart() throws Exception
    {
        //super.doStart()會呼叫AbstractLifeCycle的doStart()方法,
        //而那個方法中是沒有做任何事情的。
        super.doStart();
        //將啟動的執行緒數設定為0
        _threadsStarted.set(0);
        //_jobs是QueuedThreadPool用來儲存傳入Runnable物件的資料結構,可以在jetty的xml配置檔案中指定具體型別,如果沒有指定則會根據_maxQueued屬性的值來選擇,具體如下。
        if (_jobs==null)
        {
            _jobs=_maxQueued>0 ?new ArrayBlockingQueue<Runnable>(_maxQueued)
                :new BlockingArrayQueue<Runnable>(_minThreads,_minThreads);
        }

        //如果當前啟動的執行緒數小於設定的最小值,則不斷啟動新的執行緒,
        //直到執行緒池中有_minThreads個執行緒
        int threads=_threadsStarted.get();
        while (isRunning() && threads<_minThreads)
        {
            startThread(threads);//startThread方法負責啟動新執行緒
            threads=_threadsStarted.get();
        }
    }

上面這段程式碼邏輯很簡單,需要注意的幾個點如下:

  • QueuedThreadPool中有兩個屬性來記錄當前已啟動和空閒的執行緒數,它們都是執行緒安全的AtomicInteger型別變數,如下:
private final AtomicInteger _threadsStarted = new       AtomicInteger(); //記錄已啟動的執行緒數量
    private final AtomicInteger _threadsIdle = new AtomicInteger(); //記錄空閒的執行緒數量 
  • ArrayBlockingQueue是java.util.concurrent包下用陣列實現的阻塞佇列,效能不是很好。而BlockingArrayQueue是jetty內部基於迴圈資料實現的一個阻塞佇列,效能會好一點。

下面來看下startThread(int threads)方法中的邏輯:

private boolean startThread(int threads)
    {
        final int next=threads+1;
        //原子變數的先比較後更新操作,如果比較失敗,說明有其它執行緒在併發操作執行緒池,這種情況下如果不返還則會導致_threadsStarted記錄的啟動執行緒數目出錯。如果將整個startThread()方法都進行加鎖是可以避免這種情況的,但是那樣的話會極大的降低併發性。
        if (!_threadsStarted.compareAndSet(threads,next))
            return false;
        boolean started=false;
        try
        {
            Thread thread=newThread(_runnable);
            thread.setDaemon(_daemon);
            thread.setPriority(_priority);
            thread.setName(_name+"-"+thread.getId());
            _threads.add(thread);

            thread.start();
            started=true;
        }
        finally
        {
            if (!started)
                _threadsStarted.decrementAndGet();
        }
        return started;
    }

整個方法就是新建一個執行緒並啟動然後將其加入到執行緒池_threads中。_threads是QueuedThreadPool中用來儲存執行緒物件的容器,是一個無限容量的佇列。

    private final ConcurrentLinkedQueue<Thread> _threads=new ConcurrentLinkedQueue<Thread>();

下面來重點關注下建立執行緒的時候,傳遞給執行緒的Runnable物件_runnable中的邏輯。這個物件的定義如下:

  private Runnable _runnable = new Runnable()
    {
        public void run()
        {
            boolean shrink=false;
            try
            {  
                //從任務佇列中取出一個任務來,poll()是非阻塞的操作,沒有元素時返回null
                Runnable job=_jobs.poll();
                while (isRunning())
                {
                    // 如果任務佇列中一直有任務,則不斷取出其中的任務執行
                    while (job!=null && isRunning())
                    {
                        runJob(job);//其實就是呼叫job.run()方法
                        job=_jobs.poll();
                    }

                    // 任務佇列中沒有任務時的空閒迴圈
                    try
                    {
                        //增加空閒執行緒的數量
                        _threadsIdle.incrementAndGet();

                        while (isRunning() && job==null)
                        {
                            //將空閒等待時間的設為不大於0的情況下(預設是1分鐘),則當前執行緒會一直阻塞在任務佇列的take()操作上.
                            if (_maxIdleTimeMs<=0)
                                job=_jobs.take();
                            else
                            {
                                // 下面是是否要收縮執行緒池的判斷
                                final int size=_threadsStarted.get();//目前以窮的執行緒數
                                if (size>_minThreads)
                                {
                                    long last=_lastShrink.get(); //上次進行收縮執行緒池操作的時間,未進行過則為0
                                    long now=System.currentTimeMillis();
                                    if (last==0 || (now-last)>_maxIdleTimeMs)
                                    {

//最後設定_lastShrink和_threadsStarted的數目,並且使用的都是原子變數的compareAndSet型別操作,防止併發修改的問題。                         shrink=_lastShrink.compareAndSet(last,now) &&
                                        _threadsStarted.compareAndSet(size,size-1);
                                        //如果確實可以收縮
                                        if (shrink)
                                            return;
                                    }
                                }
                                //如果不滿足收縮的條件,則通過_jobs.poll(_maxIdleTimeMs,TimeUnit.MILLISECONDS)方法阻塞_maxIdleTimeMs,如果再次期間拿到job則返回job迴圈,否則再進行一次空閒迴圈的檢查。
                                job=idleJobPoll();
                            }
                        }
                    }
                    finally
                       //移除成功,減少空閒執行緒計數
                        _threadsIdle.decrementAndGet();
                    }
                }
            }
            catch(InterruptedException e)
            {
                LOG.ignore(e);
            }
            catch(Exception e)
            {
                LOG.warn(e);
            }
            finally
            {//進入這個finally有兩種情況,第一種是收縮執行緒池了(shrink為true),另一種是異常退出了或改變伺服器狀態退出了,這種情況下需要減少已啟動執行緒計數。
                if (!shrink)
                    _threadsStarted.decrementAndGet();
                 //直接將當前執行緒物件從執行緒池中移除,隨後會被垃圾回收。
                _threads.remove(Thread.currentThread());
            }
        }
    };

首先可以看到執行緒池和外部沒有直接進行互動(並不是從執行緒池中取出Thread物件給外部使用);而是通過_jobs這麼個阻塞佇列和外部進行互動,具體來說,外部有任務需要安排一個執行緒執行的時候就將任務加入到佇列中(如果佇列已滿則會新增失敗),而執行緒池中的執行緒會每隔一段時間就檢查一次任務佇列,如果有任務需要執行則會取出進行執行(job迴圈)。對於佇列中沒有任務需要處理的情況,可以通過設定_maxIdleTimeMs的值來控制執行緒的表現:如果_maxIdleTimeMs的值小於0,則執行緒會一直阻塞在_jobs.take()方法上;如果_maxIdleTimeMs的值大於0,則會先檢查是否可以收縮執行緒池(檢查的標準就是上次收縮的時間到目前要大於_maxIdleTimeMs並且當前啟動的執行緒數目大於_minThreads),如果可以收縮則當前執行緒會被從執行緒池中移除,如果不可以則當前執行緒會在_jobs.poll()方法上阻塞_maxIdleTimeMs時間,如果在這段時間裡這個方法返回一個job,則進行入job迴圈,否則繼續上面的迴圈。

2) dispatch()和execute()方法
這兩個方法都是執行緒池對外提供的執行方法,接受的引數都是一個Runnable物件。實際上execute()方法是通過dispatch()方法實現的:

public void execute(Runnable job)
    {
        if (!dispatch(job))
            throw new RejectedExecutionException();
    }

可以看到是直接呼叫的dispatch()方法。dispatch()方法的原始碼如下:

public boolean dispatch(Runnable job)
    {
        if (isRunning())
        {
            final int jobQ = _jobs.size();
            final int idle = getIdleThreads();
            if(_jobs.offer(job)) //offer是非阻塞操作,如果底層佇列空間不夠,則立即返回false
            {
                //如果沒有空閒執行緒或當前佇列中等待的任務數大於空閒的執行緒數,並且執行緒池容量還沒達到_maxThreads的時候會新增一個處理執行緒。
                if (idle==0 || jobQ>idle)
                {
                    int threads=_threadsStarted.get();
                    if (threads<_maxThreads)
                        startThread(threads);
                }
                return true;
            }
        }
        LOG.debug("Dispatched {} to stopped {}",job,this);
        return false;
    }

3) setMinThreads()方法
最後看一下setMinThreads()方法:

public void setMinThreads(int minThreads)
    {
        _minThreads=minThreads;

        if (_minThreads>_maxThreads)
            _maxThreads=_minThreads;

        int threads=_threadsStarted.get();
        while (isStarted() && threads<_minThreads)
        {
            startThread(threads);
            threads=_threadsStarted.get();
        }
    }

這個方法就幾行程式碼,值得注意的是在啟動之後可以通過擴大_minThreads的值來實現執行緒池的動態擴大。

上面幾個方法分析完,QueuedThreadPool也就算分析完了,說實話QueuedThreadPool的邏輯比前面那些Handler簡單多了,所以分析原始碼也輕鬆很多。不過雖然QueuedThreadPool的邏輯很簡單,但是併發效能可是很不錯的。