1. 程式人生 > >Quartz框架多個trigger任務執行出現漏執行的問題分析

Quartz框架多個trigger任務執行出現漏執行的問題分析

一、問題描述
使用Quartz配置定時任務,配置了超過10個定時任務,這些定時任務配置的觸發時間都是5分鐘執行一次,實際執行時,發現總有幾個定時任務不能執行到。

二、示例程式
1、簡單介紹
採用spring+quartz整合方案實現定時任務,Quartz的SchedulerFactoryBean配置引數中不注入taskExecutor屬性,使用預設自帶的執行緒池。準備了15個定時任務,全部設定為每隔10秒觸發一次,定時任務的實現邏輯是使用休眠8秒的方式模擬執行定時任務的時間耗費。

2、配置檔案資訊如下(節選):

<bean id="startQuertz" lazy-init
="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
<property name="triggers"> <list> <ref bean="testMethod1Trigger"/> <ref bean="testMethod2Trigger"/> // 以下省略13個 觸發器的配置 </list
>
</property> </bean> <bean id="testMethod1Trigger" class="org.springframework.scheduling.quartz.CronTriggerFactoryBean"> <property name="jobDetail" ref="testMethod1" /> <!-- 指定Cron表示式:每10秒觸發一次 --> <property name="cronExpression"
value="0/10 * * * * ?"/>
</bean> <bean id="testMethod1" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"> <property name="targetObject" ref="triggerService" /> <!-- 要執行的方法名稱 --> <property name="targetMethod" value="method1" /> </bean> // 以下省略14個定時任務的配置

3、Java定時任務類程式如下(節選)

@Service("triggerService")
public class TriggerService {

    private int cnt1;


    public void method1() {
        try {
            Thread.sleep(8000);
        } catch (InterruptedException e) {
        }
        cnt1++;
    }

    public void print() {
        StringBuffer sb = new StringBuffer();
        sb.append("\nmethod1:" + cnt1);
        sb.append("\nmethod2:" + cnt2);
        sb.append("\nmethod3:" + cnt3);
        sb.append("\nmethod4:" + cnt4);
        sb.append("\nmethod5:" + cnt5);
        sb.append("\nmethod6:" + cnt6);
        sb.append("\nmethod7:" + cnt7);
        sb.append("\nmethod8:" + cnt8);
        sb.append("\nmethod9:" + cnt9);
        sb.append("\nmethod10:" + cnt10);
        sb.append("\nmethod11:" + cnt11);
        sb.append("\nmethod12:" + cnt12);
        sb.append("\nmethod13:" + cnt13);
        sb.append("\nmethod14:" + cnt14);
        sb.append("\nmethod15:" + cnt15);
        System.out.println(sb.toString());
    }
}

實現邏輯很簡單,總共定義15個方法,方法內休眠6秒,同時每個方法都使用一個成員變數記錄被呼叫的次數,並在該類的print()方法裡統一輸出所有方法呼叫次數的概況。

4、client啟動程式如下:

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = "classpath:applicationContext.xml")
public class TriggerServiceTest extends TestCase {

    @Autowired
    private TriggerService triggerService;

    @Test
    public void testService() {
        try {
            while (true) {
                Thread.sleep(11000);
                triggerService.print();
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

一個簡單的單元測試用例,每隔11秒呼叫一次定時任務服務類的print()方法,輸出定時任務呼叫次數的統計值。

5、執行結果
我們讓這個demo程式跑了幾分鐘,控制檯輸出的取樣結果如下:

method1:25
method2:25
method3:25
method4:25
method5:12
method6:12
method7:12
method8:12
method9:12
method10:25
method11:25
method12:25
method13:25
method14:25
method15:25

6、結果分析
此次取樣的資料結果表示:15個任務中,有10個執行了25次,另外5個只執行了12次,執行的次數不一樣,說明在定時任務排程過程中,有的任務會被遺漏不執行,目前的實驗結果能夠重現上文描述的問題。

三、原始碼分析
剛開始我們對此也是感覺到很疑惑,因為任務被漏執行時,沒有任何警告或報錯的日誌資訊,這個問題若在實際生產中出現了,很難查明原因。
我們來看一下相關的原始碼實現,希望能在原始碼中發現一些有價值的資訊:
1)SchedulerFactoryBean類的初始化操作
其中關於執行緒池屬性注入的相關程式碼如下(省略了部分程式碼):

/**
 * Load and/or apply Quartz properties to the given SchedulerFactory.
 * @param schedulerFactory the SchedulerFactory to initialize
 */
private void initSchedulerFactory(SchedulerFactory schedulerFactory) throws SchedulerException, IOException {
    if (!(schedulerFactory instanceof StdSchedulerFactory)) {
        if (this.configLocation != null || this.quartzProperties != null ||
                this.taskExecutor != null || this.dataSource != null) {
            throw new IllegalArgumentException(
                    "StdSchedulerFactory required for applying Quartz properties: " + schedulerFactory);
        }
        // Otherwise assume that no initialization is necessary...
        return;
    }

    // 省略其他程式碼...

    // 此為需要關注的程式碼
    if (this.taskExecutor != null) {
        mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS,
                LocalTaskExecutorThreadPool.class.getName());
    }
    else {
        // Set necessary default properties here, as Quartz will not apply
        // its default configuration when explicitly given properties.
        mergedProps.setProperty(StdSchedulerFactory.PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());
        mergedProps.setProperty(PROP_THREAD_COUNT, Integer.toString(DEFAULT_THREAD_COUNT));
    }

    // 省略其他程式碼...
}

此程式碼的邏輯是,如果taskExecutor屬性有注入值,就使用指定的執行緒池,一般Spring是會配置執行緒池的,執行緒池的引數可以自行指定。如果taskExecutor未注入值,就使用org.quartz.simple.SimpleThreadPool執行緒池,DEFAULT_THREAD_COUNT的值為10,即該執行緒池的大小為10。
我們現在演示的場景是未設定taskExecutor的,所以執行緒池是SimpleThreadPool的例項物件,池的大小為10。

2)執行過程中,定時任務的觸發過程
首先,要從執行緒池獲取可用資源,該實現在org.quartz.core.QuartzSchedulerThread執行緒類的run方法中,如程式碼所示:

int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();

//這個方法的實現在SimpleThreadPool類裡
    public int blockForAvailableThreads() {
        synchronized(nextRunnableLock) {

            while((availWorkers.size() < 1 || handoffPending) && !isShutdown) {
                try {
                    nextRunnableLock.wait(500);
                } catch (InterruptedException ignore) {
                }
            }

            return availWorkers.size();
        }
    }

注意這個獲取執行緒池資源的方法是阻塞式的,若執行緒池資源不夠用,會一直等待直至獲取到可用的資源。這裡是產生等待的原因。

然後我們看一下定時任務允許被觸發的條件,實現的原始碼還是在
org.quartz.core.QuartzSchedulerThread執行緒類的run方法中:

try {
    triggers = qsRsrcs.getJobStore().acquireNextTriggers(
            now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());
    lastAcquireFailed = false;
    if (log.isDebugEnabled())
        log.debug("batch acquisition of " + (triggers == null ? 0 : triggers.size()) + " triggers");
} catch (JobPersistenceException jpe) {
    if(!lastAcquireFailed) {
        qs.notifySchedulerListenersError(
            "An error occurred while scanning for the next triggers to fire.",jpe);
    }
    lastAcquireFailed = true;
    continue;
} catch (RuntimeException e) {
    if(!lastAcquireFailed) {
        getLog().error("quartzSchedulerThreadLoop: RuntimeException "
                +e.getMessage(), e);
    }
    lastAcquireFailed = true;
    continue;
}

最關鍵的是acquireNextTriggers方法,這個方法是獲取所有可用的觸發器,定位到org.quartz.simpl.RAMJobStore實現類中,程式碼如下:

/**
 * <p>
 * Get a handle to the next trigger to be fired, and mark it as 'reserved'
 * by the calling scheduler.
 * </p>
 *
 * @see #releaseAcquiredTrigger(OperableTrigger)
 */
public List<OperableTrigger> acquireNextTriggers(long noLaterThan, int maxCount, long timeWindow) {
    synchronized (lock) {
        List<OperableTrigger> result = new ArrayList<OperableTrigger>();
        Set<JobKey> acquiredJobKeysForNoConcurrentExec = new HashSet<JobKey>();
        Set<TriggerWrapper> excludedTriggers = new HashSet<TriggerWrapper>();
        long firstAcquiredTriggerFireTime = 0;

        // return empty list if store has no triggers.
        if (timeTriggers.size() == 0)
            return result;

        while (true) {
            TriggerWrapper tw;

            try {
                tw = timeTriggers.first();
                if (tw == null)
                    break;
                timeTriggers.remove(tw);
            } catch (java.util.NoSuchElementException nsee) {
                break;
            }

            if (tw.trigger.getNextFireTime() == null) {
                continue;
            }

            if (applyMisfire(tw)) {
                if (tw.trigger.getNextFireTime() != null) {
                    timeTriggers.add(tw);
                }
                continue;
            }

            if (tw.getTrigger().getNextFireTime().getTime() > noLaterThan + timeWindow) {
                timeTriggers.add(tw);
                break;
            }

            // 省略部分程式碼...

            if (result.size() == maxCount)
                break;
        }

        // If we did excluded triggers to prevent ACQUIRE state due to DisallowConcurrentExecution, we need to add them back to store.
        if (excludedTriggers.size() > 0)
            timeTriggers.addAll(excludedTriggers);
        return result;
    }
}

請注意一下while迴圈內呼叫的applyMisfire方法,實現如下:

protected boolean applyMisfire(TriggerWrapper tw) {

    long misfireTime = System.currentTimeMillis();
    if (getMisfireThreshold() > 0) {
        misfireTime -= getMisfireThreshold();
    }

    Date tnft = tw.trigger.getNextFireTime();
    if (tnft == null || tnft.getTime() > misfireTime
            || tw.trigger.getMisfireInstruction() == Trigger.MISFIRE_INSTRUCTION_IGNORE_MISFIRE_POLICY) {
        return false;
    }

    // 省略其他程式碼...

    return true;
}

以上原始碼為了節省篇幅有部分省略,有興趣的可以自行閱讀完整程式碼。

注意一下這裡返回為false的判斷邏輯,這個方法返回為false,表示acquireNextTriggers將不再接收這個定時任務,並且沒有任何資訊輸出,這樣該定時任務在觸發過程中就被忽略不執行了。

順便留意一下misfireTime,它取當前的時間點,另外減小了5秒鐘(減小的時間引數可以設定,預設是5秒),如果我們把tnft.getTime()理解為定時任務預先設定的執行時間,那麼”nextFireTime + misfireThreshold”我們可以理解為任務執行的過期時間,misfireTime這個變數是用來跟nextFireTime比較的引數,如果nextFireTime大於misfireTime,即任務當前執行的時間點大於過期時間”nextFireTime + misfireThreshold”,表示任務已經超過了等待的限度,那麼這個任務就不再被執行了。
簡單地說,就是一個定時任務經過獲取可用的執行緒池資源,到執行這段邏輯的時間,如果5秒內無法完成的話, 這個任務就不再執行了。

回想我們的演示案例,定時任務是超過了10個,就肯定存線上程池資源獲取等待的問題,而每個定時任務的方法是休眠6秒鐘,又超過了5秒的限度,所以每次排程時,總有一些任務是被略過了的。

四、解決方案
經過以上分析,我們已經瞭解到出現些問題的原因,解決方案有兩種:
1、注入taskExecutor屬性,保證執行緒池資源是夠用的。
2、各個定時任務錯峰觸發。
演示案例的定時任務觸發時間均為10秒一次,錯峰時間配置可以參照素數原理,減小衝突可能性,比如配置時間為5分鐘,7分鐘,11分鐘,13分鐘,17分鐘等,這樣高峰相遇的概率會低一些。
以上兩個方案可根據實際情況挑選,也可以組合使用。

五、總結
1、經過閱讀原始碼分析,可以瞭解到兩個關鍵點:執行緒池資源獲取等待定時任務過期作廢機制。
2、Quartz框架的定時任務執行是絕對時間觸發的,所以存在“過期不候”的現象。
3、在使用Quartzs框架時,一定要預先計算好triggers數量與執行緒池大小的匹配程度,資源一定要夠,或者任務執行密度不能太大,否則等到執行緒任務釋放完,trigger早已過期,就無法按預期時間觸發了。

六、FAQ
Q1、Quartz框架使用絕對時間觸發機制有什麼好處?
A1、我個人覺得這種機制對執行環境是一種過載保護,如果任務負荷過重,已經來不及執行的,就適當放棄。如此一來,我們使用就需要注意實際業務場景這種特性的存在,並通過適當增加執行緒資源,減小任務執行密度,任務錯峰觸發等方法來避免這種情況發生。只是個人見解,僅作參考。