Quartz定時任務執行原理分析
1.定時任務執行原理
在java中常見的定時排程方案有:ScheduledExecutorService和quartz兩種方案。其本質上都是通過native的wait方法來實現的.
1.1 ScheduledExecutorService定時執行原理
ScheduledExecutorService的schedule方法,其根據delay週期性的執行任務。
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
其核心執行類和方法:其主要流程就是根據initTime和period時間計算出第一次執行的時間差,然後呼叫ReentrantLock.newCondition().awaitNanos(long nanosTimeout)方法,到指定的時間進行喚醒,分配執行緒進行執行。對於後續的週期性執行的await時間為period.
從程式碼下面可以發現,在每次執行完成後會呼叫setNextRunTime來設定下次需要排程的時間,然後根據當前時間和下次執行時間的差值進行await(),當下次觸發時間到達則喚醒執行緒進行執行。
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
//獲取下一個將要任務,並根據計算的delay時間進行等待,知道等待時間到達則喚醒任務,並進行執行。
public RunnableScheduledFuture<?> take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture<?> first = queue[0];
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return finishPoll(first);
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}
private void setNextRunTime() {
long p = period;
if (p > 0)
time += p;
else
time = triggerTime(-p);
}
1.2 Quartz定時排程的執行原理
quartz定時排程是通過Object.wait方式(native方法)實現的,其本質是通過作業系統的時鐘來實現的。Quartz主要的執行類和執行方法。
其主要流程如下:
public class QuartzSchedulerThread extends Thread{
public void run() {
boolean lastAcquireFailed = false;
while (!halted.get()) {
try {
// check if we're supposed to pause...
synchronized (sigLock) {
while (paused && !halted.get()) {
try {
// wait until togglePause(false) is called...
sigLock.wait(1000L);
} catch (InterruptedException ignore) {
}
}
if (halted.get()) {
break;
}
}
int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...
List<OperableTrigger> triggers = null;
long now = System.currentTimeMillis();
clearSignaledSchedulingChange();
try {
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
lastAcquireFailed = false;
if (log.isDebugEnabled())
log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
if(!lastAcquireFailed) {
qs.notifySchedulerListenersError(
"An error occurred while scanning for the next triggers to fire.",
jpe);
}
lastAcquireFailed = true;
continue;
} catch (RuntimeException e) {
if(!lastAcquireFailed) {
getLog().error("quartzSchedulerThreadLoop: RuntimeException "
+e.getMessage(), e);
}
lastAcquireFailed = true;
continue;
}
if (triggers != null && !triggers.isEmpty()) {
now = System.currentTimeMillis();
long triggerTime = triggers.get(0).getNextFireTime().getTime();
long timeUntilTrigger = triggerTime - now;
while(timeUntilTrigger > 2) {
synchronized (sigLock) {
if (halted.get()) {
break;
}
if (!isCandidateNewTimeEarlierWithinReason(triggerTime, false)) {
try {
// we could have blocked a long while
// on 'synchronize', so we must recompute
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
if(timeUntilTrigger >= 1)
sigLock.wait(timeUntilTrigger);
} catch (InterruptedException ignore) {
}
}
}
if(releaseIfScheduleChangedSignificantly(triggers, triggerTime)) {
break;
}
now = System.currentTimeMillis();
timeUntilTrigger = triggerTime - now;
}
// this happens if releaseIfScheduleChangedSignificantly decided to release triggers
if(triggers.isEmpty())
continue;
// set triggers to 'executing'
List<TriggerFiredResult> bndles = new ArrayList<TriggerFiredResult>();
boolean goAhead = true;
synchronized(sigLock) {
goAhead = !halted.get();
}
if(goAhead) {
try {
List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);
if(res != null)
bndles = res;
} catch (SchedulerException se) {
qs.notifySchedulerListenersError(
"An error occurred while firing triggers '"
+ triggers + "'", se);
//QTZ-179 : a problem occurred interacting with the triggers from the db
//we release them and loop again
for (int i = 0; i < triggers.size(); i++) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
}
continue;
}
}
for (int i = 0; i < bndles.size(); i++) {
TriggerFiredResult result = bndles.get(i);
TriggerFiredBundle bndle = result.getTriggerFiredBundle();
Exception exception = result.getException();
if (exception instanceof RuntimeException) {
getLog().error("RuntimeException while firing trigger " + triggers.get(i), exception);
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
// it's possible to get 'null' if the triggers was paused,
// blocked, or other similar occurrences that prevent it being
// fired at this time... or if the scheduler was shutdown (halted)
if (bndle == null) {
qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
continue;
}
JobRunShell shell = null;
try {
shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
shell.initialize(qs);
} catch (SchedulerException se) {
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
continue;
}
if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
// this case should never happen, as it is indicative of the
// scheduler being shutdown or a bug in the thread pool or
// a thread pool being used concurrently - which the docs
// say not to do...
getLog().error("ThreadPool.runInThread() return false!");
qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
}
}
continue; // while (!halted)
}
} else { // if(availThreadCount > 0)
// should never happen, if threadPool.blockForAvailableThreads() follows contract
continue; // while (!halted)
}
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
} catch(RuntimeException re) {
getLog().error("Runtime error occurred in main trigger firing loop.", re);
}
} // while (!halted)
// drop references to scheduler stuff to aid garbage collection...
qs = null;
qsRsrcs = null;
}
}
- 獲取最近需要執行的任務列表(30s內的將要執行的任務)。然後根據執行時間進行排序,然後計算出需要wait()的時間。當排程時間來臨,歡迎主執行緒,將任務交給一個執行緒池進行執行。
triggers = qsRsrcs.getJobStore().acquireNextTriggers(
now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
- 30秒內沒有需要執行的任務,則等待一個隨機時間。getRandomizedIdleWaitTime產生一個30秒內隨機等待時間。
long now = System.currentTimeMillis();
long waitTime = now + getRandomizedIdleWaitTime();
long timeUntilContinue = waitTime - now;
synchronized(sigLock) {
try {
if(!halted.get()) {
// QTZ-336 A job might have been completed in the mean time and we might have
// missed the scheduled changed signal by not waiting for the notify() yet
// Check that before waiting for too long in case this very job needs to be
// scheduled very soon
if (!isScheduleChanged()) {
sigLock.wait(timeUntilContinue);
}
}
} catch (InterruptedException ignore) {
}
}
2.quartz-db定時排程執行過程分析
2.1 執行流程圖
在QuartzSchedulerThread主執行緒中,首先會從QRTZ_TRIGGERS表中取出最近30秒內將要執行的任務,然後等待executeTime-now時間,然後在等待喚醒時交給執行緒池處理。當任務執行完成時,會通過事件機制,更新QRTZ_TRIGGERS中的nextFireTime。在每次獲取QRTZ_TRIGGERS最近30秒的Trigger時,都會先對QRTZ_LOCKS表中的Trigger行進行加鎖,從而保證了一個任務只會在分散式環境中的一臺機器上執行。
2.2 結合DB分析執行過程
- QRTZ_TRIGGERS:用於儲存接下來需要進行排程的triiger.
SELECT TRIGGER_NAME, TRIGGER_GROUP, NEXT_FIRE_TIME, PRIORITY FROM QRTZ_TRIGGERS WHERE SCHED_NAME = ‘quartzScheduler’ AND TRIGGER_STATE = ‘WAITING’ AND NEXT_FIRE_TIME <= 1506322290680 AND (MISFIRE_INSTR = -1 OR (MISFIRE_INSTR != -1 AND NEXT_FIRE_TIME >= 1506322375502)) ORDER BY NEXT_FIRE_TIME ASC, PRIORITY DESC
例如因為機器重啟被中斷的trigger
(該trigger是在機器啟動時從QRTZ_FIRED_TRIGGER掃描到的記錄,都會加入到QRTZ_TRIGGERS等待進一步呼叫,其NEXT_FIRE_TIME為空,表示優先順序最高)
mysql> select TRIGGER_NAME,TRIGGER_GROUP,from_unixtime(NEXT_FIRE_TIME/1000) from QRTZ_TRIGGERS;
+----------------------------------------------------------------+-----------------+------------------------------------+
| TRIGGER_NAME | TRIGGER_GROUP | from_unixtime(NEXT_FIRE_TIME/1000) |
+----------------------------------------------------------------+-----------------+------------------------------------+
| recover_caowenyideMacBook-Pro.local1506316013536_1506322246054 | RECOVERING_JOBS | 2017-09-25 14:48:30.0070 |
| sampleJobCronTrigger | DEFAULT | 2017-09-25 14:56:45.8750 |
+----------------------------------------------------------------+-----------------+------------------------------------+
- QRTZ_FIRED_TRIGGERS: 記錄正在進行排程的TRIIGER
INSERT INTO QRTZ_FIRED_TRIGGERS (SCHED_NAME, ENTRY_ID, TRIGGER_NAME, TRIGGER_GROUP, INSTANCE_NAME, FIRED_TIME, SCHED_TIME, STATE, JOB_NAME, JOB_GROUP, IS_NONCONCURRENT, REQUESTS_RECOVERY, PRIORITY) VALUES(‘quartzScheduler’, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
mysql> select TRIGGER_NAME,TRIGGER_GROUP,TRIGGER_STATE from QRTZ_TRIGGERS;
+----------------------------------------------------------------+-----------------+---------------+
| TRIGGER_NAME | TRIGGER_GROUP | TRIGGER_STATE |
+----------------------------------------------------------------+-----------------+---------------+
| recover_caowenyideMacBook-Pro.local1506316013536_1506322246054 | RECOVERING_JOBS | ACQUIRED |
| sampleJobCronTrigger | DEFAULT | WAITING |
+----------------------------------------------------------------+-----------------+---------------+
mysql> select TRIGGER_NAME,TRIGGER_GROUP from QRTZ_FIRED_TRIGGERS;
+----------------------------------------------------------------+-----------------+
| TRIGGER_NAME | TRIGGER_GROUP |
+----------------------------------------------------------------+-----------------+
| recover_caowenyideMacBook-Pro.local1506316013536_1506322246054 | RECOVERING_JOBS |
+----------------------------------------------------------------+-----------------+
SELECT * FROM QRTZ_LOCKS WHERE SCHED_NAME = ‘quartzScheduler’ AND LOCK_NAME = ? FOR UPDATE
INSERT INTO QRTZ_LOCKS(SCHED_NAME, LOCK_NAME) VALUES (‘quartzScheduler’, ?)
//獲取鎖失敗直接拋異常,保證了只會有一個任務會獲得鎖
mysql> select * from QRTZ_LOCKS;
+-----------------+----------------+
| SCHED_NAME | LOCK_NAME |
+-----------------+----------------+
| quartzScheduler | STATE_ACCESS |
| quartzScheduler | TRIGGER_ACCESS |
+-----------------+----------------+
- 執行任務
job = sched.getJobFactory().newJob(firedTriggerBundle, scheduler); 任務執行完成(事件機制)
DELETE FROM QRTZ_TRIGGERS WHERE SCHED_NAME = ‘quartzScheduler’ AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
DELETE FROM QRTZ_FIRED_TRIGGERS WHERE SCHED_NAME = ‘quartzScheduler’ AND ENTRY_ID = ?並且更新下次要排程的trigger時間
UPDATE QRTZ_TRIGGERS SET JOB_NAME = ?, JOB_GROUP = ?, DESCRIPTION = ?, NEXT_FIRE_TIME = ?, PREV_FIRE_TIME = ?, TRIGGER_STATE = ?, TRIGGER_TYPE = ?, START_TIME = ?, END_TIME = ?, CALENDAR_NAME = ?, MISFIRE_INSTR = ?, PRIORITY = ?, JOB_DATA = ? WHERE SCHED_NAME = ‘quartzScheduler’ AND TRIGGER_NAME = ? AND TRIGGER_GROUP = ?
mysql> select TRIGGER_NAME,from_unixtime(NEXT_FIRE_TIME/1000),from_unixtime(PREV_FIRE_TIME/1000),from_unixtime(START_TIME/1000),TRIGGER_STATE from QRTZ_TRIGGERS;
+----------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| TRIGGER_NAME | from_unixtime(NEXT_FIRE_TIME/1000) | from_unixtime(PREV_FIRE_TIME/1000) | from_unixtime(START_TIME/1000) | TRIGGER_STATE |
+----------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| sampleJobCronTrigger | 2017-09-25 15:45:00.0000 | 2017-09-25 15:40:00.0000 | 2017-09-25 11:55:02.0000 | WAITING |
+----------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
1 row in set (0.00 sec)
PS:QRTZ_TRIGGERS每一個任務對應一條記錄
mysql> select TRIGGER_NAME,from_unixtime(NEXT_FIRE_TIME/1000),from_unixtime(PREV_FIRE_TIME/1000),from_unixtime(START_TIME/1000),TRIGGER_STATE from QRTZ_TRIGGERS;
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| TRIGGER_NAME | from_unixtime(NEXT_FIRE_TIME/1000) | from_unixtime(PREV_FIRE_TIME/1000) | from_unixtime(START_TIME/1000) | TRIGGER_STATE |
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| sampleJobCronTrigger | 2017-09-25 16:05:00.0000 | 2017-09-25 16:00:00.0000 | 2017-09-25 11:55:02.0000 | WAITING |
| sampleJobV2CronTrigger | 2017-09-25 16:05:00.0000 | 2017-09-25 16:00:00.0000 | 2017-09-25 15:57:17.0000 | WAITING |
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
2 rows in set (0.00 sec)
案例1:線上有個任務有問題,立即kill掉jvm,但是機器重啟後任務還是會執行,怎麼解決?
答案:在機器重啟前刪除掉QRTZ_FIRED_TRIGGERS中對應的記錄
資料庫截圖如下:
mysql> select * from QRTZ_FIRED_TRIGGERS;;
+-----------------+-------------------------------------------------------+----------------------+---------------+------------------------------------------+---------------+---------------+----------+-----------+-----------------+-----------+------------------+-------------------+
| SCHED_NAME | ENTRY_ID | TRIGGER_NAME | TRIGGER_GROUP | INSTANCE_NAME | FIRED_TIME | SCHED_TIME | PRIORITY | STATE | JOB_NAME | JOB_GROUP | IS_NONCONCURRENT | REQUESTS_RECOVERY |
+-----------------+-------------------------------------------------------+----------------------+---------------+------------------------------------------+---------------+---------------+----------+-----------+-----------------+-----------+------------------+-------------------+
| quartzScheduler | caowenyideMacBook-Pro.local15063261797261506326179669 | sampleJobCronTrigger | DEFAULT | caowenyideMacBook-Pro.local1506326179726 | 1506327900007 | 1506327900000 | 5 | EXECUTING | sampleJobDetail | DEFAULT | 1 | 1 |
+-----------------+-------------------------------------------------------+----------------------+---------------+------------------------------------------+---------------+---------------+----------+-----------+-----------------+-----------+------------------+-------------------+
1 row in set (0.00 sec)
ERROR:
No query specified
mysql> select TRIGGER_NAME,from_unixtime(NEXT_FIRE_TIME/1000),from_unixtime(PREV_FIRE_TIME/1000),from_unixtime(START_TIME/1000),TRIGGER_STATE from QRTZ_TRIGGERS;
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| TRIGGER_NAME | from_unixtime(NEXT_FIRE_TIME/1000) | from_unixtime(PREV_FIRE_TIME/1000) | from_unixtime(START_TIME/1000) | TRIGGER_STATE |
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| sampleJobCronTrigger | 2017-09-25 16:30:00.0000 | 2017-09-25 16:25:00.0000 | 2017-09-25 11:55:02.0000 | BLOCKED |
| sampleJobV2CronTrigger | 2017-09-25 16:30:00.0000 | 2017-09-25 16:25:00.0000 | 2017-09-25 16:18:00.0000 | WAITING |
+------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
2 rows in set (0.00 sec)
mysql> select TRIGGER_NAME,from_unixtime(NEXT_FIRE_TIME/1000),from_unixtime(PREV_FIRE_TIME/1000),from_unixtime(START_TIME/1000),TRIGGER_STATE from QRTZ_TRIGGERS;
+----------------------------------------------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| TRIGGER_NAME | from_unixtime(NEXT_FIRE_TIME/1000) | from_unixtime(PREV_FIRE_TIME/1000) | from_unixtime(START_TIME/1000) | TRIGGER_STATE |
+----------------------------------------------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
| recover_caowenyideMacBook-Pro.local1506326179726_1506328016310 | NULL | 2017-09-25 16:25:00.0000 | 2017-09-25 16:25:00.0000 | COMPLETE |
| sampleJobCronTrigger | 2017-09-25 16:30:00.0000 | 2017-09-25 16:25:00.0000 | 2017-09-25 11:55:02.0000 | BLOCKED |
| sampleJobV2CronTrigger | 2017-09-25 16:30:00.0000 | 2017-09-25 16:25:00.0000 | 2017-09-25 16:18:00.0000 | WAITING |
+----------------------------------------------------------------+------------------------------------+------------------------------------+--------------------------------+---------------+
3 rows in set (0.00 sec)