1. 程式人生 > >Java定時任務Timer排程器【二】 多執行緒原始碼分析(圖文版)

Java定時任務Timer排程器【二】 多執行緒原始碼分析(圖文版)

 

上一節通過一個小例子分析了Timer執行過程,牽涉的執行執行緒雖然只有兩個,但實際場景會比上面複雜一些。

首先通過一張簡單類圖(只列出簡單的依賴關係)看一下Timer暴露的介面。

 

為了演示Timer所暴露的介面,下面舉一個極端的例子(每一個介面方法面向單獨的執行執行緒),照樣以鬧鐘為例(原始碼只列出關鍵部分,下同)。

public class ScheduleDemo {

    public static void main(String[] args) throws Exception {
        AlarmTask alarm1 = new AlarmTask("鬧鐘1");
        AlarmTask alarm2 = new AlarmTask("鬧鐘2");
        new Thread("執行緒1"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]排程鬧鐘1");
                timer.schedule(alarm1,delay,period);
            }
        }.start();
        new Thread("執行緒2"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]排程鬧鐘2");
                timer.schedule(alarm2,delay,period);
            }
        }.start();
        Thread.sleep(1500);
        new Thread("執行緒3"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2");
                alarm2.cancel();
            }
        }.start();
        new Thread("執行緒4"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘");
                timer.purge();
            }
        }.start();
        new Thread("執行緒5"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]關閉所有鬧鐘");
                timer.cancel();
            }
        }.start();
    }

    /**
     *     模擬鬧鐘
     */
    static class AlarmTask extends TimerTask{
        String name ;
        public AlarmTask(String name){
            this.name=name;
        }
        public void run() {
            log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。");
            Thread.sleep(1000); //模擬鬧鐘執行時間
        }
    }
}

執行結果

[執行緒2]排程鬧鐘2
[執行緒1]排程鬧鐘1
[Timer-0]-[鬧鐘2]嘀。。。
[執行緒3]取消鬧鐘2
[執行緒4]清理無用鬧鐘
[執行緒5]關閉所有鬧鐘 

下面我們依次檢視一下每個介面方法的原始碼。

1. 檢視Timer.sched()原始碼

public void schedule(TimerTask task, long delay, long period) {
    sched(task, System.currentTimeMillis()+delay, -period);
}

private void sched(TimerTask task, long time, long period) {  
    // 如果period無限大,保證其在一個合理的範圍內
    if (Math.abs(period) > (Long.MAX_VALUE >> 1))
        period >>= 1;
    // 加queue鎖,保證佇列操作的執行緒安全
    synchronized(queue) {
        // 加lock鎖,保證任務狀態的一致性(多執行緒環境下)
        synchronized(task.lock) {
            task.nextExecutionTime = time;
            task.period = period;
            task.state = TimerTask.SCHEDULED;
        }
        // 將任務加入佇列實現排序
        queue.add(task);
        if (queue.getMin() == task)
            queue.notify();
    }
} 

其中queue.add(task在)將任務加入佇列的同時實現了內部排序。

void add(TimerTask task) {
    // 佇列不足時,以兩倍容量擴增
    if (size + 1 == queue.length)
        // 從效能上要快於new一個數組的效率
        queue = Arrays.copyOf(queue, 2 * queue.length);
    queue[++size] = task;
    // 利用二分查詢演算法實現任務排序
    fixUp(size);
}

private void fixUp(int k) {
    while (k > 1) {
        int j = k >> 1;
        if (queue[j].nextExecutionTime <= queue[k].nextExecutionTime)
            break;
        TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
        k = j;
    }
} 

從方法sched()可以看到,該方法一方面持有queue鎖,用來維護佇列排序的執行緒安全;一方面持有lock鎖,用來維護任務狀態的執行緒安全。

2. 檢視TimerTask.cancel()原始碼

public abstract class TimerTask implements Runnable {
  
    final Object lock = new Object();

    public boolean cancel() {
        synchronized(lock) {
            boolean result = (state == SCHEDULED);
            state = CANCELLED;
            return result;
        }
    }

對於任務的取消操作,只是簡單的修改一下任務狀態,中途也只佔有一個lock鎖!接著看一下執行任務的執行緒邏輯。

class TimerThread extends Thread {
   
    private TaskQueue queue;

    public void run() {
       mainLoop();
    }

    private void mainLoop() {
        while (true) {
                synchronized(queue) {
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    task = queue.getMin();
                    // 此處加task鎖,防止其他執行緒同時呼叫task.cancel()
                    synchronized(task.lock) {
                      // ...維護鬧鐘狀態
                    }
                  }
                  if (!taskFired) // 時間未到
                    queue.wait(executionTime - currentTime);
                }
                if (taskFired)
                    // 執行鬧鐘時,沒有保持任何鎖
                    task.run();
        }
    }

可以看到當TimerThead真正執行鬧鐘時,是沒有持鎖的,所以當鬧鐘正在執行的時候AlarmTask.cancel()對其是不起作用的,換言之,只能取消下一次將要執行的鬧鐘。

3. 檢視Timer.purge()原始碼

public class Timer {
   
    private final TaskQueue queue = new TaskQueue();

    // 保證被取消的task能及時進行垃圾回收
    public int purge() {
        int result = 0;
        synchronized(queue) {
            for (int i = queue.size(); i > 0; i--) {
                if (queue.get(i).state == TimerTask.CANCELLED) {
                    queue.quickRemove(i);
                    result++;
                }
            }
            if (result != 0)
                // 重新整理佇列中有效的任務
                queue.heapify();
        }
        return result;
    } 

進一步檢視queue.quickRemove(i)和queue.heapify()。

class TaskQueue {
 
    void quickRemove(int i) {
        queue[i] = queue[size];
        queue[size--] = null;  //清除無效任務,防止記憶體洩漏
    }

    private void fixDown(int k) {
        int j;
        while ((j = k << 1) <= size && j > 0) {
            if (j < size &&
                    queue[j].nextExecutionTime > queue[j+1].nextExecutionTime)
                j++; // j indexes smallest kid
            if (queue[k].nextExecutionTime <= queue[j].nextExecutionTime)
                break;
            TimerTask tmp = queue[j];  queue[j] = queue[k]; queue[k] = tmp;
            k = j;
        }
    }

    void heapify() {
        for (int i = size/2; i >= 1; i--)
            fixDown(i);
    } 

可以看到Timer.purge()在持有queue鎖時主要做兩件事

1.及時清除佇列中無效的鬧鐘防止記憶體洩漏。

2.重新規整佇列中鬧鐘。

4. 最後看一下Timer.cancel()原始碼

public class Timer {
  
    private final TaskQueue queue = new TaskQueue();

    private final TimerThread thread = new TimerThread(queue);

    public void cancel() {
        synchronized(queue) {
            thread.newTasksMayBeScheduled = false;
            queue.clear();
          //防止佇列為空的情況下,TimerThead無限等待
            queue.notify();  
        }
    } 

該方法在清除所有鬧鐘的同時,與TimerThread發生了一次執行緒通訊——喚醒TimerThread並讓其永久退出。

private void mainLoop() {
    while (true) {
            synchronized(queue) {
                while (queue.isEmpty() && newTasksMayBeScheduled)
                    queue.wait();
                if (queue.isEmpty())
                    break;  // TimerThread永久退出
                queue.wait(executionTime - currentTime);
            }
     }
} 

以上是整個過程的靜態分析,現在捕捉一個執行緒快照進行動態分析。為了dump一個特定時刻的執行緒快照,現在在Timer.sched()打一個斷點(注意斷點的方式與位置)。

以debug模式執行下面的例子。

public class ScheduleDemo {

    public static void main(String[] args) throws Exception {
        AlarmTask alarm1 = new AlarmTask("鬧鐘1");
        AlarmTask alarm2 = new AlarmTask("鬧鐘2");
        new Thread("執行緒1"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]排程鬧鐘1");
                timer.schedule(alarm1,delay,period);
            }
        }.start();
        new Thread("執行緒2"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]排程鬧鐘2");
                timer.schedule(alarm2,delay,period);
            }
        }.start();
        Thread.sleep(1500);
        new Thread("執行緒3"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]取消鬧鐘2");
                alarm2.cancel();
            }
        }.start();
        new Thread("執行緒4"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]清理無用鬧鐘");
                timer.purge();
            }
        }.start();
        new Thread("執行緒5"){
            public void run() {
                log.info("["+Thread.currentThread().getName()+"]關閉所有鬧鐘");
                timer.cancel();
            }
        }.start();
    }

    /**
     *     模擬鬧鐘
     */
    static class AlarmTask extends TimerTask{
        String name ;
        public AlarmTask(String name){
            this.name=name;
        }
        public void run() {
            log.info("["+Thread.currentThread().getName()+"]-["+name+"]嘀。。。");
            Thread.sleep(1000); //模擬鬧鐘執行時間
        }
    }
}

下圖是visualVM工具dump出的執行緒快照(斷點處)

通過上面的快照可以看到,當“執行緒1“(持有兩把鎖)處於RUNNABLE狀態時,”執行緒2“、“執行緒3”、“執行緒4”、“執行緒5”都處於BLOCKED狀態。需要注意的是,因為TimerThread的時間未到,暫時處於WATING狀態(等待喚醒)。

下面是一個簡單的形象圖

總結:Timer為了保證執行緒安全,使用了大量的鎖機制,整體上對CPU的利用率不高。