1. 程式人生 > >Quartz 2.2 的實現原理和執行過程

Quartz 2.2 的實現原理和執行過程

一、Quartz 的幾個概念類

這幾個概念類,是我們呼叫Quartz任務排程的基礎。瞭解清楚之後,我們再來看一下如何去啟動和關閉一個Quartz排程程式。

  • 1、org.quartz.Job
    它是一個抽象介面,表示一個工作,也就是我們要執行的具體內容,他只定義了一個幾口方法:
    void execute(JobExecutionContext context)
    作用等同Spring的:
    org.springframework.scheduling.quartz.QuartzJobBean
  • 2、org.quartz.JobDetail
    JobDetail表示一個具體的可執行的排程程式,Job是這個可執行程排程程式所要執行的內容,它包含了這個任務排程的方案和策略。
    他的實現類:
    org.quartz.impl.JobDetailImpl

    作用等同Spring:
    org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean
  • 3、org.quartz.Trigger
    它是一個抽象介面,表示一個排程引數的配置,通過配置它,來告訴排程容器什麼時候去呼叫JobDetail。
    他的兩個實現類:
    org.quartz.impl.triggers.SimpleTriggerImpl
    org.quartz.impl.triggers.CronTriggerImpl
    等同於Spring的:
    org.springframework.scheduling.quartz.SimpleTriggerBean

    org.springframework.scheduling.quartz.CronTriggerBean
    前者只支援按照一定頻度呼叫任務,如每隔30分鐘執行一次。
    後者既支援按照一定頻度呼叫任務,又支援定時任務。
  • 4、org.quartz.Scheduler
    代表一個排程容器,一個排程容器中可以註冊多個JobDetail和Trigger。當Trigger與JobDetail組合,就可以被Scheduler容器排程了。它的方法有start()、shutdown()等方法,負責管理整個排程作業。
    等同Spring的: org.springframework.scheduling.quartz.SchedulerFactoryBean

二、Quartz 入門示例

import org.quartz.JobBuilder;
import org.quartz.JobDetail;
import org.quartz.Scheduler;
import org.quartz.SchedulerException;
import org.quartz.SchedulerFactory;
import org.quartz.SimpleScheduleBuilder;
import org.quartz.Trigger;
import org.quartz.TriggerBuilder;
import org.quartz.impl.StdSchedulerFactory;

public class Quartz {

    public static void main(String[] args) {
        // 1、建立JobDetial物件 , 並且設定選項
        JobDetail jobDetail= JobBuilder.newJob(MyJob.class).withIdentity("testJob_1","group_1").build();

        // 2、通過 TriggerBuilder 建立Trigger物件
        TriggerBuilder<Trigger> triggerBuilder = TriggerBuilder.newTrigger();
        triggerBuilder.withIdentity("trigger_1", "group_1");
        triggerBuilder.startNow();
        // 設定重複次數和間隔時間
        triggerBuilder.withSchedule(SimpleScheduleBuilder.simpleSchedule()
                .withIntervalInMilliseconds(1) //時間間隔
                .withRepeatCount(5) // 重複次數
         );
        // 設定停止時間
        //triggerBuilder.endAt(new Date(System.currentTimeMillis() + 3));
        // 建立Trigger物件
        Trigger trigger = triggerBuilder.build();

        // 3、建立Scheduler物件,並配置JobDetail和Trigger物件
        SchedulerFactory sf = new StdSchedulerFactory();
        try {
            Scheduler scheduler = sf.getScheduler();
            scheduler.scheduleJob(jobDetail, trigger);
            // 4、並執行啟動、關閉等操作
            scheduler.start();

          //關閉排程器
          //scheduler.shutdown(true); 
        } catch (SchedulerException e) {
            e.printStackTrace(); 
        }
    }
}
import java.util.Date;

import org.quartz.Job;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

public class MyJob implements Job{

    public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
        System.out.println(new Date() + ": doing something...");
    }
}

執行結果:

Wed Aug 03 19:12:53 CST 2016: doing something…
Wed Aug 03 19:12:53 CST 2016: doing something…
Wed Aug 03 19:12:53 CST 2016: doing something…
Wed Aug 03 19:12:53 CST 2016: doing something…
Wed Aug 03 19:12:53 CST 2016: doing something…
Wed Aug 03 19:12:53 CST 2016: doing something…

當把結束時間改為:

// 設定停止時間
triggerBuilder.endAt(new Date(System.currentTimeMillis() + 3));

執行結果:

Wed Aug 03 19:17:12 CST 2016: doing something…
Wed Aug 03 19:17:12 CST 2016: doing something…
Wed Aug 03 19:17:12 CST 2016: doing something…

結果分析:
可以看出,設定重複執行5次,沒設定停止時間的時候,執行了6次,當停止時間設定為new Date(System.currentTimeMillis() + 3時, 果只執行了3次。 也就是說當到了停止時間,不管執行的次數是否到達最大次數,都不在執行了。

當添加了關閉排程器:

//關閉排程器
scheduler.shutdown(true); 

執行結果:

Wed Aug 03 19:17:12 CST 2016: doing something…

三、Quartz 簡單總結

  • 1、scheduler.shutdown(true); // true表示等到本次執行結束
  • 2、為了配置強大時間排程策略,可以研究專門的CronTrigger。
  • 3、整合Spring時,Quartz容器關閉方式有兩種方法,一種是關閉- Spring容器,一種是獲取到SchedulerFactoryBean例項,然後呼叫一個shutdown就搞定了。
  • 4、Quartz的JobDetail、Trigger都可以在執行時重新設定,並且在下次呼叫時候起作用。這就為動態作業的實現提供了依據。你可以將排程時間策略存放到資料庫,然後通過資料庫資料來設定Trigger,這樣就能產生動態的排程。
  • 5、JobDetail不儲存具體的例項,但它允許你定義一個例項,JobDetail 又指向JobDataMap。 JobDetail持有Job的詳細資訊,如它所屬的組,名稱等資訊
  • 6、JobDataMap儲存著任務例項的物件,並保持著他們狀態資訊,它是Map介面的實現,即你可以往裡面put和get一些你想儲存和獲取的資訊.
  • 7、scheduler容器包含多個JobDetail和Trigger。scheduler是個容器,容器中有一個執行緒池,用來並行排程執行每個作業,這樣可以提高容器效率。

四、Quartz 執行過程

quartz執行時由QuartzSchedulerThread類作為主體,迴圈執行排程流程。JobStore作為中間層,按照quartz的併發策略執行資料庫操作,完成主要的排程邏輯。JobRunShellFactory負責例項化JobDetail物件,將其放入執行緒池執行。LockHandler負責獲取LOCKS表中的資料庫鎖。
整個quartz對任務排程的時序大致如下:

0.排程器執行緒run()

1.獲取待觸發trigger
1.1讀取JobDetail資訊
1.2讀取trigger表中觸發器資訊並標記為”已獲取”

2.觸發trigger
2.1確認trigger的狀態
2.2讀取trigger的JobDetail資訊
2.3讀取trigger的Calendar資訊
2.4更新trigger資訊

3例項化並執行Job
3.1從執行緒池獲取執行緒執行JobRunShell的run方法

我們看到初始化一個排程器需要用工廠類獲取例項:

SchedulerFactory sf = new StdSchedulerFactory();
Scheduler sch = sf.getScheduler(); 
// 然後啟動:
sch.start();

下面跟進StdSchedulerFactory的getScheduler()方法:

public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }
        SchedulerRepository schedRep = SchedulerRepository.getInstance();
        //從"排程器倉庫"中根據properties的SchedulerName配置獲取一個排程器例項
        Scheduler sched = schedRep.lookup(getSchedulerName());
        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }
        //初始化排程器
        sched = instantiate();
        return sched;
    }

我們再看一下 sched = instantiate(); 排程器的初始化方法
- 讀取配置資源,
- 生成QuartzScheduler物件,
- 建立該物件的執行執行緒,並啟動執行緒;
- 初始化JobStore,QuartzScheduler,DBConnectionManager等重要元件,
- 至此,排程器的初始化工作已完成,初始化工作中quratz讀取了資料庫中存放的對應當前排程器的鎖資訊,對應CRM中的表QRTZ2_LOCKS,中的STATE_ACCESS,TRIGGER_ACCESS兩個LOCK_NAME.

public void initialize(ClassLoadHelper loadHelper,
            SchedulerSignaler signaler) throws SchedulerConfigException {
        if (dsName == null) {
            throw new SchedulerConfigException("DataSource name not set.");
        }
        classLoadHelper = loadHelper;
        if(isThreadsInheritInitializersClassLoadContext()) {
            log.info("JDBCJobStore threads will inherit ContextClassLoader of thread: " + Thread.currentThread().getName());
            initializersLoader = Thread.currentThread().getContextClassLoader();
        }

        this.schedSignaler = signaler;
        // If the user hasn't specified an explicit lock handler, then
        // choose one based on CMT/Clustered/UseDBLocks.
        if (getLockHandler() == null) {

            // If the user hasn't specified an explicit lock handler,
            // then we *must* use DB locks with clustering
            if (isClustered()) {
                setUseDBLocks(true);
            }

            if (getUseDBLocks()) {
                if(getDriverDelegateClass() != null && getDriverDelegateClass().equals(MSSQLDelegate.class.getName())) {
                    if(getSelectWithLockSQL() == null) {
                        //讀取資料庫LOCKS表中對應當前排程器的鎖資訊
                        String msSqlDflt = "SELECT * FROM {0}LOCKS WITH (UPDLOCK,ROWLOCK) WHERE " + COL_SCHEDULER_NAME + " = {1} AND LOCK_NAME = ?";
                        getLog().info("Detected usage of MSSQLDelegate class - defaulting 'selectWithLockSQL' to '" + msSqlDflt + "'.");
                        setSelectWithLockSQL(msSqlDflt);
                    }
                }
                getLog().info("Using db table-based data access locking (synchronization).");
                setLockHandler(new StdRowLockSemaphore(getTablePrefix(), getInstanceName(), getSelectWithLockSQL()));
            } else {
                getLog().info(
                    "Using thread monitor-based data access locking (synchronization).");
                setLockHandler(new SimpleSemaphore());
            }
        }
    }

當呼叫sch.start();方法時, scheduler做了如下工作:
1.通知listener開始啟動
2.啟動排程器執行緒
3.啟動plugin
4.通知listener啟動完成

public void start() throws SchedulerException {
        if (shuttingDown|| closed) {
            throw new SchedulerException(
                    "The Scheduler cannot be restarted after shutdown() has been called.");
        }
        // QTZ-212 : calling new schedulerStarting() method on the listeners
        // right after entering start()
        //通知該排程器的listener啟動開始
        notifySchedulerListenersStarting();
        if (initialStart == null) {
            initialStart = new Date();
            //啟動排程器的執行緒
            this.resources.getJobStore().schedulerStarted();            
            //啟動plugins
            startPlugins();
        } else {
            resources.getJobStore().schedulerResumed();
        }
        schedThread.togglePause(false);
        getLog().info(
                "Scheduler " + resources.getUniqueIdentifier() + " started.");
        //通知該排程器的listener啟動完成
        notifySchedulerListenersStarted();
    }

排程過程

排程器啟動後,排程器的執行緒就處於執行狀態了,開始執行quartz的主要工作–排程任務. 前面已介紹過,任務的排程過程大致分為三步:
1.獲取待觸發trigger
2.觸發trigger
3.例項化並執行Job
而排程過程的三個步驟就承載在QuartzSchedulerThread這個排程器執行緒類的run()方法中。程式碼大家可以看一下原始碼。我在這裡就略過性的支出run()方法中對應上面三個步驟的原始碼。

觸發器的獲取,run()方法中獲取待觸發trigger
呼叫JobStoreSupport.acquireNextTriggers方法:

 //排程器在trigger佇列中尋找30秒內一定數目的trigger準備執行排程,
                        //引數1:nolaterthan = now+3000ms,引數2 最大獲取數量,大小取執行緒池執行緒剩餘量與定義值得較小者
                        //引數3 時間視窗 預設為0,程式會在nolaterthan後加上視窗大小來選擇trigger
                        triggers = qsRsrcs.getJobStore().acquireNextTriggers(
                                now + idleWaitTime, Math.min(availThreadCount, qsRsrcs.getMaxBatchSize()), qsRsrcs.getBatchTimeWindow());

引數的意義如下:
引數1:nolaterthan = now+3000ms,即未來30s內將會被觸發.
引數2 最大獲取數量,大小取執行緒池執行緒剩餘量與定義值得較小者.
引數3 時間視窗 預設為0,程式會在nolaterthan後加上視窗大小來選擇

觸發trigger: QuartzSchedulerThread.run()
呼叫JobStoreSupport.triggersFired方法:

List<TriggerFiredResult> res = qsRsrcs.getJobStore().triggersFired(triggers);

該方法做了以下工作:
1.獲取trigger當前狀態
2.通過trigger中的JobKey讀取trigger包含的Job資訊
3.將trigger更新至觸發狀態
4.結合calendar的資訊觸發trigger,涉及多次狀態更新
5.更新資料庫中trigger的資訊,包括更改狀態至STATE_COMPLETE,及計算下一次觸發時間.
6.返回trigger觸發結果的資料傳輸類TriggerFiredBundle

Job執行過程:QuartzSchedulerThread.run()

qsRsrcs.getJobStore().releaseAcquiredTrigger(triggers.get(i));
shell.initialize(qs);

為每個Job生成一個可執行的RunShell,並放入執行緒池執行.
在最後排程執行緒生成了一個隨機的等待時間,進入短暫的等待,這使得其他節點的排程器都有機會獲取資料庫資源.如此就實現了quratz的負載平衡.
這樣一次完整的排程過程就結束了.排程器執行緒進入下一次迴圈.

相關推薦

Quartz 2.2實現原理執行過程

一、Quartz 的幾個概念類 這幾個概念類,是我們呼叫Quartz任務排程的基礎。瞭解清楚之後,我們再來看一下如何去啟動和關閉一個Quartz排程程式。 1、org.quartz.Job 它是一個抽象介面,表示一個工作,也就是我們要執行的具體內容,他只

圖解mapreduce原理執行過程

說明: 下面的圖來自南京大學計算機系黃宜華老師開設的mapreduce課程的課件,這裡稍作整理和 總結。 本文旨在對接觸了mapreduce之後,但是對mapreduce的工作流程仍不是很清楚的人員,當然包括博主自己,希望與大家一起學習。 mapreduce的原理

Eureka--2、第一個Eureka程式,Eureka Client的自啟動原理簡要過程

在之前的Spring Cloud Config的基礎上,搭建簡單的Eureka Server 沒有程式碼,很多試驗做不起來,我們先建立個Eureka Server再說。 本篇主要講如何建立Eureka Server和Eureka Client,還有個重點是通過原始碼分析E

2.2--RandomAccessFile實現它的關聯類FileChannel

-- 根據 探討 重要 put input 一個 思考 file   我認為分析源碼是程序設計的一個逆過程,重要的是分析的過程,而且要時不時地換位思考,別人為什麽要這樣設計,多給自己敲敲邊鼓,是提出問題的源泉,有了問題就不怕找不到答案。現在已經大體知道FileChannel

Java多執行緒之AQS(AbstractQueuedSynchronizer )實現原理原始碼分析(三)

章節概覽、 1、回顧 上一章節,我們分析了ReentrantLock的原始碼: 2、AQS 佇列同步器概述 本章節我們深入分析下AQS(AbstractQueuedSynchronizer)佇列同步器原始碼,AQS是用來構建鎖或者其他同步元件的基礎框架。

Java多執行緒之Condition實現原理原始碼分析(四)

章節概覽、 1、概述 上面的幾個章節我們基於lock(),unlock()方法為入口,深入分析了獨佔鎖的獲取和釋放。這個章節我們在此基礎上,進一步分析AQS是如何實現await,signal功能。其功能上和synchronize的wait,notify一樣。

play framework 2.5.3 學習使用過程中的“坑”

play framework 2.5.3學習和使用過程中的“坑” 最近專案需要,接觸到了play, 使用過程中,遇到了一些坑, 記下來。 1. 版本 play分1.x和2.x 兩個系列,差異很大。 2.x系列 中的2.3+也有所不同,(目前官方的說法是 Ligh

20.1 shell指令碼介紹 20.2 shell指令碼結構執行 20.3 date命令用法 20.4 shell指令碼中的變數

20.1 shell指令碼介紹20.2 shell指令碼結構和執行20.3 date命令用法20.4 shell指令碼中的變數shell指令碼介紹shell是一種指令碼語言可以使用邏輯判斷、迴圈等語法可以自定義函式shell是系統命令的集合shell指令碼可以實現自動化運維,

Java多執行緒之ThreadPoolExecutor實現原理原始碼分析(五)

章節概覽、 1、概述 執行緒池的顧名思義,就是執行緒的一個集合。需要用到執行緒,從集合裡面取出即可。這樣設計主要的作用是優化執行緒的建立和銷燬而造成的資源浪費的情況。Java中的執行緒池的實現主要是JUC下面的ThreadPoolExecutor類完成的。下面

JVM——深入解析原理執行機制(一)類載入過程

       隔了好久終於把這篇文章補上了,最近在看《深入理解Java虛擬機器》,一本很不錯的書,必須值得一看。        由於本人對Java類的載入過程一直是一知半解,所以優先看了一下

Java多執行緒之ReentrantLock實現原理原始碼分析(二)

章節概覽、 1、ReentrantLock概述 ReentrantLock字面含義是可重入的互斥鎖,實現了和synchronize關鍵字一樣的獨佔鎖功能。但是ReentrantLock使用的是自旋鎖,通過CAS硬體原語指令實現的輕量級的鎖,不會引起上下文切換

瀏覽器的解析執行過程

們的 由於 繼續 動畫 table 就會 內嵌 cnblogs 內嵌腳本 當瀏覽器獲得一個html文件時,會“自上而下”加載,並在加載過程中進行解析渲染。 解析: 1. 瀏覽器會將HTML解析成一個DOM樹(display:none,visibility:hidden)。

[Go] sync.Pool 的實現原理 適用場景

臨時 digg 簡單的 設置 com 運行 之前 結果 官方文檔 摘錄一: Go 1.3 的 sync 包中加入一個新特性:Pool。 官方文檔可以看這裏 http://golang.org/pkg/sync/#Pool 這個類設計的目的是用來保存和復用臨時對象,以減

Struts2工作原理執行流程圖

過濾器 map filters play servle 同時 cati 通過 spa 在struts2的應用中,從用戶請求到服務器返回相應響應給用戶端的過程中,包含了許多組件如:Controller、ActionProxy、ActionMapping、Configurati

DNS的原理解析過程

dnsDNS的解析原理和過程:在Internet上域名和IP是對應的,DNS解析有兩種:一種是正向解析,另外一種是反向解析。正向解析:正向解析就是將域名轉換成對應的 IP地址的過程,它應用於在瀏覽器地址欄中輸入網站域名時的情形。反向解析:根據IP地址查找對應的註冊域名,經常被一些後臺程序使用,用戶看不到。另外

HashMap實現原理源碼分析

aci 鍵值對 creat 變化 遍歷數組 沖突的解決 查看 seed 二分 作者: dreamcatcher-cx 出處: <http://www.cnblogs.com/chengxiao/>原文:https://www.cnblogs.com/cheng

STL原始碼剖析——stack的實現原理使用方法詳解

Stack 簡介     stack 是堆疊容器,是一種“先進後出”的容器。     stack 是簡單地裝飾 deque 容器而成為另外一種容器。     使用 stack 時需要加上標頭檔案 #include<s

STL原始碼剖析——deque的實現原理使用方法詳解

Deque 簡介     deque是“double—ended queue”的縮寫,和vector一樣都是STL的容器,deque 是雙端陣列,而 vector 是單端的。     deque 在介面上和 vector 非常相似,在許多操作的地方

MapReduce的原理執行過程

MapReduce簡介 1.MapReduce是一種分散式計算模型,是Google提出的,主要用於搜尋領域,解決海量資料的計算問題。 2.MR有兩個階段組成:Map和Reduce,使用者只需實現map()和reduce()兩個函式,即可實現分散式計算。 MapReduce執行流程

HashMap的實現原理底層結構 圖解+原始碼分析

 雜湊表(hash table)也叫散列表,是一種非常重要的資料結構,應用場景及其豐富,許多快取技術(比如memcached)的核心其實就是在記憶體中維護一張大的雜湊表,而HashMap的實現原理也常常出現在各類的面試題中,重要性可見一斑。本文會對java集合框架中的對應實現HashMap的實現原理