1. 程式人生 > >Java 定時任務實現原理詳解

Java 定時任務實現原理詳解

在jdk自帶的庫中,有兩種技術可以實現定時任務。一種是使用Timer,另外一個則是ScheduledThreadPoolExecutor。下面為大家分析一下這兩個技術的底層實現原理以及各自的優缺點。
一、Timer
1. Timer的使用

class MyTask extends TimerTask{
    @Override
    public void run() {
        System.out.println("hello world");
    }
}
public class TimerDemo {
    public static void main(String[] args) {
        //建立定時器物件
        Timer t=new Timer();
        //在3秒後執行MyTask類中的run方法,後面每10秒跑一次
        t.schedule(new MyTask(), 3000,10000);

    }
}

通過往Timer提交一個TimerTask的任務,同時指定多久後開始執行以及執行週期,我們可以開啟一個定時任務。
2. 原始碼解析

首先我們先來看一下Timer這個類

//存放定時任務的佇列
//這個TaskQueue 也是Timer內部自定義的一個佇列,這個佇列通過最小堆來維護佇列
//下一次執行時間距離現在最小的會被放在堆頂,到時執行執行緒直接獲取堆頂任務並判斷是否執行即可
private final TaskQueue queue = new TaskQueue();
//負責執行定時任務的執行緒
private final TimerThread thread = new TimerThread(queue);
public Timer() {
        this("Timer-" + serialNumber());
}
public Timer(String name) {
        //設定執行緒的名字,並且啟動這個執行緒
        thread.setName(name);
        thread.start();
}

再來看一下TimerThread 這個類,這個類也是定義在Timer.class中的一個類,它繼承了Thread類,所以可以直接拿來當執行緒使用。
我們直接來看他的構造方法以及run方法

//在Timer中初始化的時候會將Timer的Queue賦值進來
TimerThread(TaskQueue queue) {
        this.queue = queue;
}
public void run() {
        try {
            //進入自旋,開始不斷的從任務佇列中獲取定時任務來執行
            mainLoop();
        } finally {
            // Someone killed this Thread, behave as if Timer cancelled
            synchronized(queue) {
                newTasksMayBeScheduled = false;
                queue.clear();  // Eliminate obsolete references
            }
        }
}
private void mainLoop() {
        while (true) {
            try {
                TimerTask task;
                boolean taskFired;
                //加同步
                synchronized(queue) {
                    //如果任務佇列為空,並且newTasksMayBeScheduled為true,就休眠等待,直到有任務進來就會喚醒這個執行緒
                    //如果有人呼叫timer的cancel方法,newTasksMayBeScheduled會變成false
                    while (queue.isEmpty() && newTasksMayBeScheduled)
                        queue.wait();
                    if (queue.isEmpty())
                        break;

                    // 獲取當前時間和下次任務執行時間
                    long currentTime, executionTime;
                    //獲取佇列中最早要執行的任務
                    task = queue.getMin();
                    synchronized(task.lock) {
                        //如果這個任務已經被結束了,就從佇列中移除
                        if (task.state == TimerTask.CANCELLED) {
                            queue.removeMin();
                            continue;  // No action required, poll queue again
                        }
                        //獲取當前時間和下次任務執行時間
                        currentTime = System.currentTimeMillis();
                        executionTime = task.nextExecutionTime;
                        //判斷任務執行時間是否小於當前時間,表示小於,就說明可以執行了
                        if (taskFired = (executionTime<=currentTime)) {
                            //如果任務的執行週期是0,說明只要執行一次就好了,就從佇列中移除它,這樣下一次就不會獲取到該任務了
                            if (task.period == 0) {
                                queue.removeMin();
                                task.state = TimerTask.EXECUTED;
                            } else {
                                //重新設定該任務下一次的執行時間
                                //如果之前設定的period小於0,就用當前時間-period,等於就是當前時間加上週期值
                                //這裡的下次執行時間就是當前的執行時間加上週期值
                                //這裡涉及到是否以固定頻率呼叫任務的問題,下面再詳細講解
                                queue.rescheduleMin(
                                  task.period<0 ? currentTime   - task.period
                                                : executionTime + task.period);
                            }
                        }
                    }
                    //如果任務的執行時間還沒到,就計算出還有多久才到達執行時間,然後執行緒進入休眠
                    if (!taskFired)
                        queue.wait(executionTime - currentTime);
                }
                //如果任務的執行時間到了,就執行這個任務
                if (taskFired)
                    task.run();
            } catch(InterruptedException e) {
            }
        }
}
 

通過上面的程式碼,我們大概瞭解了Timer是怎麼工作的了。下面來看一下schedule()方法的相關程式碼

//Timer.java
public void schedule(TimerTask task, long delay, long period) {
        if (delay < 0)
            throw new IllegalArgumentException("Negative delay.");
        if (period <= 0)
            throw new IllegalArgumentException("Non-positive period.");
        //呼叫內部的一個方法
        sched(task, System.currentTimeMillis()+delay, -period);
}

private void sched(TimerTask task, long time, long period) {
        if (time < 0)
            throw new IllegalArgumentException("Illegal execution time.");

        // 如果設定的定時任務週期太長,就將其除以2
        if (Math.abs(period) > (Long.MAX_VALUE >> 1))
            period >>= 1;

        //加鎖同步
        synchronized(queue) {
            if (!thread.newTasksMayBeScheduled)
                throw new IllegalStateException("Timer already cancelled.");
            //設定任務的各個屬性
            synchronized(task.lock) {
                if (task.state != TimerTask.VIRGIN)
                    throw new IllegalStateException(
                        "Task already scheduled or cancelled");
                task.nextExecutionTime = time;
                task.period = period;
                task.state = TimerTask.SCHEDULED;
            }
            //將任務加入到佇列中
            queue.add(task);
            //如果任務加入佇列後排在堆頂,說明該任務可能馬上可以執行了,那就喚醒執行執行緒
            if (queue.getMin() == task)
                queue.notify();
        }
}

3. 總結

Timer的原理比較簡單,當我們初始化Timer的時候,timer內部會啟動一個執行緒,並且初始化一個優先順序佇列,該優先順序佇列使用了最小堆的技術來將最早執行時間的任務放在堆頂。
當我們呼叫schedule方法的時候,其實就是生成一個任務然後插入到該優先順序佇列中。最後,timer內部的執行緒會從優先順序佇列的堆頂獲取任務,獲取到任務後,先判斷執行時間是否到了,如果到了先設定下一次的執行時間並調整堆,然後執行任務。如果沒到執行時間那執行緒就休眠一段時間。
關於計算下次任務執行時間的策略:
這裡設定下一次執行時間的演算法會根據傳入peroid的值來判斷使用哪種策略:
- 如果peroid是負數,那下一次的執行時間就是當前時間+peroid的值
- 如果peroid是正數,那下一次執行時間就是該任務這次的執行時間+peroid的值。
這兩個策略的不同點在於,如果計算下次執行時間是以當前時間為基數,那它就不是以固定頻率來執行任務的。因為Timer是單執行緒執行任務的,如果A任務執行週期是10秒,但是有個B任務執行了20幾秒,那麼下一次A任務的執行時間就要等B執行完後輪到自己時,再過10秒才會執行下一次。
如果策略是這次任務的執行時間+peroid的值就是按固定頻率不斷執行任務了。讀者可以自行模擬一下
二、ScheduledThreadPoolExecutor
1. 使用

ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(8);
        scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
            @Override
            public void run() {
                System.out.println("hello world");
            }
}, 1, 3, TimeUnit.SECONDS);

2. 實現原理+原始碼解析

由於ScheduledThreadPoolExecutor是基於執行緒池實現的。所以瞭解它的原理之前讀者有必要先了解一下Java執行緒池的實現。關於Java執行緒池的實現原理,可以看我的另外一篇部落格:Java執行緒池實現原理詳解

我們直接來看一下的原始碼

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        //將任務進行一層封裝,最後得到一個ScheduledFutureTask物件
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        //進行一些裝飾,其實就是返回sft這個物件
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //提交給執行緒池執行
        delayedExecute(t);
        return t;
}
private void delayedExecute(RunnableScheduledFuture<?> task) {
        //如果執行緒池已經關閉,就拒絕這個任務
        if (isShutdown())
            reject(task);
        else {
            //將當前任務加入到任務佇列中去
            super.getQueue().add(task);
            //判斷執行緒池是否關閉了,然後判斷是否需要移除這個任務
            if (isShutdown() &&
                !canRunInCurrentRunState(task.isPeriodic()) &&
                remove(task))
                task.cancel(false);
            else
                //因為這裡的定時任務是直接放到任務佇列中,所以需要保證已經有worker啟動了
                ensurePrestart();
        }
}
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        //如果worker的數量小於corePoolSize,那就啟動一個worker,用來消費任務佇列的任務
        if (wc < corePoolSize)
            addWorker(null, true);
        //worker的數量為0也直接啟動一個worker
        else if (wc == 0)
            addWorker(null, false);
}

到這裡,我們可以看到我們提交的任務被封裝成一個ScheduledFutureTask然後提交給任務佇列,同時如果發現worker的數量少於設定的corePoolSize,我們還會啟動一個worker執行緒。
但是,我們怎麼保證worker不會馬上就從任務佇列中獲取任務然後直接執行呢(這樣我們設定的延遲執行就沒有效果了)?
另外,怎麼保證任務執行完下一次在一定週期後還會再執行呢,也就是怎麼保證任務的延遲執行和週期執行?
我們先來看一下任務的延遲執行的解決方案。其實就是修改任務佇列的實現,通過將任務佇列變成延遲佇列,worker不會馬上獲取到任務佇列中的任務了。只有任務的時間到了,worker執行緒才能從延遲佇列中獲取到任務並執行。
在ScheduledThreadPoolExecutor中,定義了DelayedWorkQueue類來實現延遲佇列。DelayedWorkQueue內部使用了最小堆的資料結構,當任務插入到佇列中時,會根據執行的時間自動調整在堆中的位置,最後執行時間最近的那個會放在堆頂。
當worker要去佇列獲取任務時,如果堆頂的執行時間還沒到,那麼worker就會阻塞一定時間後才能獲取到那個任務,這樣就實現了任務的延遲執行。
由於篇幅問題,DelayedWorkQueue的原始碼就不作解析了,有興趣的朋友可以去ScheduledThreadPoolExecutor類中查閱。
解決了任務的延遲執行問題,接下來就是任務的週期執行的解決方案了。週期執行和前面封裝的ScheduledFutureTask有關。我們直接來看一下ScheduledFutureTask的run方法就知道了

public void run() {
            //先判斷任務是否週期執行
            boolean periodic = isPeriodic();
            //判斷是否能執行任務
            if (!canRunInCurrentRunState(periodic))
                cancel(false);
            //判斷是否週期性任務
            else if (!periodic)
                //不是的話執行執行run方法
                ScheduledFutureTask.super.run();
            else if (ScheduledFutureTask.super.runAndReset()) {
                //如果是週期性任務,那就設定下一次的執行時間
                setNextRunTime();
                //重新將任務放到佇列中,然後等待下一次執行
                reExecutePeriodic(outerTask);
            }
}
private void setNextRunTime() {
            //根據peroid的正負來判斷下一次執行時間的計算策略
            //和timer的下一次執行時間計算策略有點像
            long p = period;
            if (p > 0)
                time += p;
            else
                time = triggerTime(-p);
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
        //先判斷是否可以在當前狀態下執行
        if (canRunInCurrentRunState(true)) {
            //重新加任務放到任務佇列中
            super.getQueue().add(task);
            if (!canRunInCurrentRunState(true) && remove(task))
                task.cancel(false);
            else
                ensurePrestart();
        }
}
  

從原始碼可以看出,當任務執行完後,如果該任務時週期性任務,那麼會重新計算下一次執行時間,然後重新放到任務佇列中等待下一次執行。
3. 總結

ScheduledThreadPoolExecutor的實現是基於java執行緒池。通過對任務進行一層封裝來實現任務的週期執行,以及將任務佇列改成延遲佇列來實現任務的延遲執行。

我們將任務放入任務佇列的同時,會嘗試開啟一個worker來執行這個任務(如果當前worker的數量小於corePoolSize)。由於這個任務佇列時一個延遲佇列,只有任務執行時間達到才能獲取到任務,因此worker只能阻塞等到有佇列中有任務到達才能獲取到任務執行。

當任務執行完後,會檢查自己是否是一個週期性執行的任務。如果是的話,就會重新計算下一次執行的時間,然後重新將自己放入任務佇列中。

關於下一次任務的執行時間的計算規則,和Timer差不多,這裡就不多做介紹。
三、Timer和ScheduledThreadPoolExecutor的區別

由於Timer是單執行緒的,如果一次執行多個定時任務,會導致某些任務被其他任務所阻塞。比如A任務每秒執行一次,B任務10秒執行一次,但是一次執行5秒,就會導致A任務在長達5秒都不會得到執行機會。而ScheduledThreadPoolExecutor是基於執行緒池的,可以動態的調整執行緒的數量,所以不會有這個問題

如果執行多個任務,在Timer中一個任務的崩潰會導致所有任務崩潰,從而所有任務都停止執行。而ScheduledThreadPoolExecutor則不會。

Timer的執行週期時間依賴於系統時間,timer中,獲取到堆頂任務執行時間後,如果執行時間還沒到,會計算出需要休眠的時間=(執行時間-系統時間),如果系統時間被調整,就會導致休眠時間無限拉長,後面就算改回來了任務也因為在休眠中而得不到執行的機會。ScheduledThreadPoolExecutor由於用是了nanoTime來計算執行週期的,所以和系統時間是無關的,無論系統時間怎麼調整都不會影響到任務排程。

注意的是,nanoTime和系統時間是完全無關的(之前一直以為只是時間戳的納秒級粒度),關於nanoTime的介紹如下:

    返回最準確的可用系統計時器的當前值,以毫微秒為單位。
    此方法只能用於測量已過的時間,與系統或鐘錶時間的其他任何時間概念無關。返回值表示從某一固定但任意的時間算起的毫微秒數(或許從以後算起,所以該值可能為負)。此方法提供毫微秒的精度,但不是必要的毫微秒的準確度。它對於值的更改頻率沒有作出保證。在取值範圍大於約 292 年(263 毫微秒)的連續呼叫的不同點在於:由於數字溢位,將無法準確計算已過的時間。

總體來說,Timer除了在版本相容性上面略勝一籌以外(Timer是jdk1.3就支援的,而ScheduledThreadPoolExecutor在jdk1.5才出現),其餘全部被ScheduledThreadPoolExecutor碾壓。所以日常技術選型中,也推薦使用ScheduledThreadPoolExecutor來實現定時任務。
---------------------
作者:瘋狂哈丘
來源:CSDN
原文:https://blog.csdn.net/u013332124/article/details/79603943
版權宣告:本文為博主原創文章,轉載請附上博文連結!