1. 程式人生 > >Java執行緒池架構(二)多執行緒排程器

Java執行緒池架構(二)多執行緒排程器

在前面介紹了java的多執行緒的基本原理資訊:《Java執行緒池架構原理和原始碼解析》,本文對這個java本身的執行緒池的排程器做一個簡單擴充套件,如果還沒讀過上一篇文章,建議讀一下,因為這是排程器的核心元件部分。

我們如果要用java預設的執行緒池來做排程器,一種選擇就是Timer和TimerTask的結合,在以前的文章:《Timer與TimerTask的真正原理&使用介紹》中有明確的說明:一個Timer為一個單獨的執行緒,雖然一個Timer可以排程多個TimerTask,但是對於一個Timer來講是序列的,至於細節請參看對應的那篇文章的內容,本文介紹的多執行緒排程器,也就是定時任務,基於多執行緒排程完成,當然你可以為了完成多執行緒使用多個Timer,只是這些Timer的管理需要你來完成,不是一個框架體系,而ScheduleThreadPoolExecutor提供了這個功能,所以我們第一要搞清楚是如何使用排程器的,其次是需要知道它的內部原理是什麼,也就是知其然,再知其所以然!

首先如果我們要建立一個基於java本身的排程池通常的方法是:

Executors.newScheduledThreadPool(int);

當有過載方法,我們最常用的是這個就從這個,看下定義:

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);
}

其實內部是new了一個例項化物件出來,並傳入大小,此時就跟蹤到ScheduledThreadPoolExecutor的構造方法中:

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());

}

你會發現呼叫了super,而super你跟蹤進去會發現,是ThreadPoolExecutor中,那麼ScheduledThreadPoolExecutor和ThreadPoolExecutor有何區別,就是本文要說得重點了,首先我們留下個引子,你發現在定義佇列的時候,不再是上文中提到的LinkedBlockingQueue,而是DelayedWorkQueue,那麼細節上我們接下來就是要講解的重點,既然他們又繼承關係,其實搞懂了不同點,就搞懂了共同點,而且有這樣的關係大多數應當是共同點,不同點的猜測:這個是要實現任務排程,任務排程不是立即的,需要延遲和定期做等情況,那麼是如何實現的呢?

這就是我們需要思考的了,通過原始碼考察,我們發現,他們都有execute方法,只是ScheduledThreadPoolExecutor將原始碼進行了重寫,並且還有以下四個排程器的方法:

public ScheduledFuture<?> schedule(Runnable command,
				       long delay, TimeUnit unit);

public  ScheduledFuture schedule(Callable callable,
					   long delay, TimeUnit unit);

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
						  long initialDelay,
						  long period,
						  TimeUnit unit);

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
						     long initialDelay,
						     long delay,
						     TimeUnit unit);

那麼這四個方法有什麼區別呢?其實第一個和第二個區別不大,一個是Runnable、一個是Callable,內部包裝後是一樣的效果;所以把頭兩個方法幾乎當成一種排程,那麼三種情況分別是:

1、 進行一次延遲排程:延遲delay這麼長時間,單位為:TimeUnit傳入的的一個基本單位,例如:TimeUnit.SECONDS屬於提供好的列舉資訊;(適合於方法1和方法2)。

2、 多次排程,每次依照上一次預計排程時間進行排程,例如:延遲2s開始,5s一次,那麼就是2、7、12、17,如果中間由於某種原因導致執行緒不夠用,沒有得到排程機會,那麼接下來計算的時間會優先計算進去,因為他的排序會被排在前面,有點類似Timer中的:scheduleAtFixedRate方法,只是這裡是多執行緒的,它的方法名也叫:scheduleAtFixedRate,所以這個是比較好記憶的(適合方法3)

3、 多次排程,每次按照上一次實際執行的時間進行計算下一次時間,同上,如果在第7秒沒有被得到排程,而是第9s才得到排程,那麼計算下一次排程時間就不是12秒,而是9+5=14s,如果再次延遲,就會延遲一個週期以上,也就會出現少呼叫的情況(適合於方法3);

4、 最後補充execute方法是一次排程,期望被立即排程,時間為空:

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

schedule(command, 0, TimeUnit.NANOSECONDS);

}

我們簡單看看scheduleAtFixedRate、scheduleWithFixedDelay對下面的分析會更加有用途:

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Boolean>(command,
                                             null,
                                             triggerTime(initialDelay, unit),
                                             unit.toNanos(-delay)));
        delayedExecute(t);
        return t;
    }

兩段原始碼唯一的區別就是在unit.toNanos(int)這唯一一個地方,scheduleAtFixedRate裡面是直接傳入值,而scheduleWithFixedDelay裡面是取了相反數,也就是假如我們都傳入正數,scheduleWithFixedDelay其實就取反了,沒有任何區別,你是否聯想到前面文章介紹Timer中類似的處理手段通過正負數區分時間間隔方法,為0代表僅僅排程一次,其實在這裡同樣是這樣的,他們也同樣有一個問題就是,如果你傳遞負數,方法的功能正好是相反的。

而你會發現,不論是那個schedule方法裡頭,都會建立一個ScheduledFutureTask類的例項,此類究竟是何方神聖呢,我們來看看。

ScheduledFutureTask的類(ScheduleThreadPoolExecutor的私有的內部類)來進行排程,那麼可以看看內部做了什麼操作,如下:

        ScheduledFutureTask(Runnable r, V result, long ns) {
            super(r, result);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a periodic action with given nano time and period.
         */
        ScheduledFutureTask(Runnable r, V result, long ns, long period) {
            super(r, result);
            this.time = ns;
            this.period = period;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

        /**
         * Creates a one-shot action with given nanoTime-based trigger.
         */
        ScheduledFutureTask(Callable callable, long ns) {
            super(callable);
            this.time = ns;
            this.period = 0;
            this.sequenceNumber = sequencer.getAndIncrement();
        }

最核心的幾個引數正好對應了排程的延遲的構造方法,這些引數如何用起來的?那麼它還提供了什麼方法呢?

       public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber)
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

這裡發現了,他們可以執行,且判定時間的方法是getDelay方法我們知道了。 對比時間的方法是:compareTo,傳入了引數型別為:Delayed型別,不難猜測出,ScheduledFutureTask和Delayed有某種繼承關係,沒錯,ScheduledFutureTask實現了Delayed的介面,只是它是間接實現的;並且Delayed介面繼承了Comparable介面,這個介面可用來幹什麼?看過我前面寫的一篇文章關於中文和物件排序的應該知道,這個是用來自定義對比和排序的,我們的排程任務是一個物件,所以需要排序才行,接下來我們回溯到開始定義的程式碼中,找一個實際呼叫的程式碼來看看它是如何啟動到run方法的?如何排序的?如何呼叫延遲的?就是我們下文中會提到的,而這裡我們先提出問題,後文我們再來說明這些問題。 我們先來看下run方法的一些定義。

           
/**            * 時間片型別任務執行            */
          private void runPeriodic() {
             //執行對應的程式,這個是具體的程式
             boolean ok = ScheduledFutureTask.super.runAndReset();
             boolean down = isShutdown();
             // Reschedule if not cancelled and not shutdown or policy allows
             if (ok && (!down ||                        (getContinueExistingPeriodicTasksAfterShutdownPolicy() && 
!isStopped()))) {
                 long p = period;
                 if (p > 0)//規定時間間隔算出下一次時間
                    time += p;
                else//用當前時間算出下一次時間,負負得正
                    time = triggerTime(-p);
                //計算下一次時間,並資深再次放入等待佇列中
                ScheduledThreadPoolExecutor.super.getQueue().add(this);
            }
            else if (down)
                interruptIdleWorkers();
        }

        /**
         * 是否為逐片段執行,如果不是,則呼叫父親類的run方法
         */
        public void run() {
            if (isPeriodic())//週期任務
                runPeriodic();
            else//只執行一次的任務
                ScheduledFutureTask.super.run();
        }

可以看到run方法首先通過isPeriod()判定是否為時間片,判定的依據就是我們說的時間片是否“不為零”,如果不是週期任務,就直接執行一次,如果是週期任務,則除了執行還會計算下一次執行的時間,並將其再次放入等待佇列,這裡對應到scheduleAtFixedRate、scheduleWithFixedDelay這兩個方法一正一負,在這裡得到判定,並且將為負數的取反回來,負負得正,java就是這麼幹的,呵呵,所以不要認為什麼是不可能的,只要好用什麼都是可以的,然後計算的時間一個是基於標準的time加上一個時間片,一個是根據當前時間計算一個時間片,在上文中我們已經明確說明了兩者的區別。

以:schedule方法為例:

public  ScheduledFuture schedule(Callable callable,
                                           long delay,
                                           TimeUnit unit) {
        if (callable == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture t = decorateTask(callable,
            new ScheduledFutureTask(callable,
	   			       triggerTime(delay, unit)));
        delayedExecute(t);
        return t;
}

其實這個方法內部建立的就是一個我們剛才提到的:ScheduledFutureTask,外面又包裝了下叫做RunnableScheduledFuture,也就是適配了下而已,呵呵,程式碼裡面就是一個return操作,java這樣做的目的是方便子類去擴充套件。

關鍵是delayedExecute(t)方法中做了什麼?看名稱是延遲執行的意思,難道java的執行緒可以延遲執行,那所有的任務執行緒都在執行狀態?
它的原始碼是這樣的:

    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }
        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();

        super.getQueue().add(command);
    }

我們主要關心prestartCoreThread()和super.getQueue().add(command),因為如果系統關閉,這些討論都沒有意義的,我們分別叫他們第二小段程式碼和第三小段程式碼。

第二個部分如果執行緒數小於核心執行緒數設定,那麼就呼叫一個prestartCoreThread(),看方法名應該是:預先啟動一個核心執行緒的意思,先看完第三個部分,再跟蹤進去看原始碼。

第三個部分很明瞭,就是呼叫super.getQueue().add(command);也就是說直接將任務放入一個佇列中,其實super是什麼?super就是我們上一篇文章所提到的ThreadPoolExecutor,那麼這個Queue就是上一篇文章中提到的等待佇列,也就是任何schedule任務首先放入等待佇列,然後等待被排程的。

    public boolean prestartCoreThread() {
        return addIfUnderCorePoolSize(null);
    }
    private boolean addIfUnderCorePoolSize(Runnable firstTask) {
        Thread t = null;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            if (poolSize < corePoolSize && runState == RUNNING)                 t = addThread(firstTask);         } finally {             mainLock.unlock();         }         if (t == null)             return false;         t.start();         return true; } 

這個程式碼是否似曾相似,沒錯,這個你在上一篇文章介紹ThreadPoolExecutor的時候就見到過,說明不論是ThreadPoolExecutor還是ScheduleThreadPoolExecutor他們的Thread都是由一個Worker來處理的(上一篇文章有介紹),而這個Worker處理的基本機制就是將當前任務執行後,不斷從執行緒等待佇列中獲取資料,然後用以執行,直到佇列為空為止。 那麼他們的區別在哪裡呢?延遲是如何實現的呢?和我們上面介紹的ScheduledFutureTask又有何關係呢? 那麼我們回過頭來看看ScheduleThreadPool的定義是如何的。

 public ScheduledThreadPoolExecutor(int corePoolSize) {
         super(corePoolSize, Integer.MAX_VALUE, 0,TimeUnit.NANOSECONDS,
               new DelayedWorkQueue());
 }
 

發現它和ThreadPoolExecutor有個定義上很大的區別就是,ThreadPoolExecutor用的是LinkedBlockingQueue(當然可以修改),它用的是DelayedWeorkQueue,而這個DelayedWorkQueue裡面你會發現它僅僅是對java.util.concurrent.DelayedQueue類一個簡單訪問包裝,這個佇列就是等待佇列,可以看到任務是被直接放到等待佇列中的,所以取資料必然從這裡獲取,而這個延遲的佇列有何神奇之處呢,它又是如何實現的呢,我們從什麼地方下手去看這個DelayWorkQueue? 我們還是回頭看看Worker裡面的run方法(上一篇文章中已經講過):

        public void run() {
             try {
                 Runnable task = firstTask;
                 firstTask = null;
                 while (task != null || (task = getTask()) != null) {
                     runTask(task);
                     task = null;
                 }
             } finally {
                 workerDone(this);
             }
         }
 

這裡面要呼叫等待佇列就是getTask()方法:

 Runnable getTask() {
         for (;;) {
             try {
                 int state = runState;
                 if (state > SHUTDOWN)
                    return null;
                Runnable r;
                if (state == SHUTDOWN)  // Help drain queue
                    r = workQueue.poll();
                else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
                    r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
                else
                    r = workQueue.take();
                if (r != null)
                    return r;
                if (workerCanExit()) {
                    if (runState >= SHUTDOWN) // Wake up others
                        interruptIdleWorkers();
                    return null;
                }
            } catch (InterruptedException ie) {
            }
        }
}

發現沒有,如果沒有設定超時,預設只會通過workQueue.take()方法獲取資料,那麼我們就看take方法,而增加到佇列裡面的方法自然看offer相關的方法。接下來我們來看下DelayQueue這個佇列的take方法:

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();//等待訊號,執行緒一直掛在哪裡
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);//最左等delay的時間段
                    } else {
                        E x = q.poll();//可以執行,取出一個
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll();
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
}

這裡的for就是要找到資料為止,否則就等著,而這個“q”和“available”是什麼呢?

private transient final Condition available = lock.newCondition();

private final PriorityQueue q = new PriorityQueue();

怎麼裡面還有一層佇列,不用怕,從這裡你貌似看出點名稱意味了,就是它是優先順序佇列,而對於任務排程來講,優先順序的方式就是時間,我們用這中猜測來繼續深入原始碼。

上面首先獲取這個佇列的第一個元素,若為空,就等待一個“available”發出的訊號,我們可以猜測到這個offer的時候會發出的訊號,一會來驗證即可;若不為空,則通過getDelay方法來獲取時間資訊,這個getDelay方法就用上了我們開始說的ScheduledFutureTask了,如果是時間大於0,則也進入等待,因為還沒開始執行,等待也是“available”發出訊號,但是有一個最長時間,為什麼還要等這個訊號,是因為有可能進來一個新的任務,比這個等待的任務還要先執行,所以要等這個訊號;而最多等這麼長時間,就是因為如果這段時間沒任務進來肯定就是它執行了。然後就返回的這個值,被Worker(上面有提到)拿到後呼叫其run()方法進行執行。

那麼寫入佇列在那裡?他們是如何排序的?

我們看看佇列的寫入方法是這樣的:

public boolean offer(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E first = q.peek();
            q.offer(e);
            if (first == null || e.compareTo(first) < 0)
                 available.signalAll();
             return true;
         } finally {
             lock.unlock();
         }
 }
 

佇列也是首先取出第一個(後面會用來和當前任務做比較),而這裡“q”是上面提到的“PriorityQueue”,看來offer的關鍵還在它的裡面,我們看看呼叫過程:

 public boolean offer(E e) {
         if (e == null)
             throw new NullPointerException();
         modCount++;
         int i = size;
         if (i >= queue.length)
            grow(i + 1);
        size = i + 1;
        if (i == 0)
            queue[0] = e;
        else
            siftUp(i, e);//主要是這條程式碼很關鍵
        return true;
}
private void siftUp(int k, E x) {
        if (comparator != null)
            siftUpUsingComparator(k, x);
        else
        //我們預設走這裡,因為DelayQueue定義它的時候預設沒有給定義comparator
            siftUpComparable(k, x);
}
/*
可以發現這個方法是將任務按照compareTo對比後,放在佇列的合適位置,但是它肯定不是絕對順序的,這一點和Timer的內部排序機制類似。
*/
private void siftUpComparable(int k, E x) {
        Comparable<? super E> key = (Comparable<? super E>) x;
        while (k > 0) {
            int parent = (k - 1) >>> 1;
            Object e = queue[parent];
            if (key.compareTo((E) e) >= 0)
                break;
            queue[k] = e;
            k = parent;
        }
        queue[k] = key;
}

你是否發現,compareTo也用上了,就是我們前面描述一大堆的:ScheduledFutureTask類中的一個方法,那麼run方法也用上了,這個過程貌似完整了。

我們再來理一下思路:

1、呼叫的Thread的包裝,由在ThreadPoolExecutor中的Worker呼叫你傳入的Runnable的run方法,變成了Worker呼叫Runnable的run方法,由它來處理時間片的資訊呼叫你傳入的執行緒。

2、ScheduledFutureTask類在整個過程中提供了基礎參考的方法,其中最為關鍵的就是實現了介面Comparable,實現內部的compareTo方法,也實現了Delayed介面中的getDelay方法用以判定時間(當然Delayed介面本身也是繼承於Comparable,我們不要糾結於細節概念就好)。

3、等待佇列由在ThreadPoolExecutor中預設使用的LinkedBlockingQueue換成了DelayQueue(它是被DelayWorkQueue包裝了一下子,沒多大區別),而DelayQueue主要提供了一個訊號量“available”來作為寫入和讀取的訊號控制開關,通過另一個優先順序佇列“PriorityQueue”來控制實際的佇列順序,他們的順序就是基於上面提到的ScheduledFutureTask類中的compareTo方法,而是否執行也是基於getDelay方法來實現的。

4、ScheduledFutureTask類的run方法會判定是否為時間片資訊,如果為時間片,在執行完對應的方法後,開始計算下一次執行時間(注意判定時間片大於0,小於0,分別代表的是以當前執行完的時間為準計算下一次時間還是以當前時間為準),這個在前面有提到。

5、它是支援多執行緒的,和Timer的機制最大的區別就在於多個執行緒會最徵用這個佇列,隊裡的排序方式和Timer有很多相似之處,並非完全有序,而是通過位移動來儘量找到合適的位置,有點類似貪心的演算法,呵呵。


謝 宇

淘寶java工程師,技術愛好者,於工作原因,學得很雜,喜歡用到什麼學什麼,現學現用。
例如JVM、Java ByteCode、資料庫、javaIO、java併發什麼的。研究得足夠解決自己問題,以及更加深入一層即可。
擅長解決問題思考問題的人,而且經常解決那種沒遇到過的問題和去思考一些技術之間的聯絡,遇到解決不了的問題或想不清楚的關係是個吃不下飯睡不著覺的人。