1. 程式人生 > >quartz 2.2.x 原始碼學習 基本執行流程分析

quartz 2.2.x 原始碼學習 基本執行流程分析

quartz 官網中給出了一些基本概念,請先閱讀官網相關概念。

下面對最簡單的一個任務排程工作進行分析,下面的程式碼每隔三秒中不斷重複執行任務SimpleJob。

public class JobStart {
    public static void main(String[] args) throws SchedulerException {
        SchedulerFactory sf = new StdSchedulerFactory();
        Scheduler sched = sf.getScheduler();

        sched.clear();

        String instanceId = sched.getSchedulerInstanceId();
        JobDetail detail = JobBuilder.newJob(SimpleJob.class)
                                     .withIdentity("Ins Id "
+ instanceId, instanceId) .requestRecovery() .build(); Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("Ins Id " + instanceId,instanceId) .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(3
).repeatForever()).build(); sched.scheduleJob(detail, trigger); sched.start(); } } @DisallowConcurrentExecution public class SimpleJob implements Job{ private static Logger _log = LoggerFactory.getLogger(SimpleJob.class); @Override public void execute(JobExecutionContext context) throws
JobExecutionException { if(context.isRecovering()){ _log.info("is recovering"); }else{ _log.info("not recovering"); } try { _log.info("do time consumed job start"); Thread.sleep(5000); _log.info("do time consumed job end"); } catch (InterruptedException e) { e.printStackTrace(); } } }

quartz 執行任務可以總結為建立JobDetail描述待執行任務,建立Trigger描述什麼時候去觸發任務。最後將JobDetail與Trigger註冊在排程器中,排程器啟動後遍開始對任務排程。

因此瞭解其基本過程需要明白 建立排程器完成哪些工作?排程工作什麼時候開始?任務如何被執行?

示例程式碼中,首先獲取一個排程器。

Scheduler sched = sf.getScheduler(); 

進入該方法內部

org.quartz.impl.StdSchedulerFactory.getScheduler() 
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();
        return sched;

獲取排程器物件時,首先查詢快取,如果有就直接拿,若不存在則呼叫instantiate()方法來建立排程器物件。

進入instantiate方法內部,定義了許多變數。

org.quartz.impl.StdSchedulerFactory.instantiate()
   private Scheduler instantiate() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        if (initException != null) {
            throw initException;
        }

        JobStore js = null;
        ThreadPool tp = null;
        QuartzScheduler qs = null;
        DBConnectionManager dbMgr = null;
        String instanceIdGeneratorClass = null;
        Properties tProps = null;
        ......

ThreadPool tp為執行緒池物件,在該方法內部可以看到該執行緒池的建立 是通過從屬性檔案quartz.properties中拿到org.quartz.threadPool所定義的線城池類來建立。本文使用的是 org.quartz.simpl.SimpleThreadPool。而最終任務通過執行緒池來管理。

 public static final String PROP_THREAD_POOL_PREFIX = "org.quartz.threadPool";

        String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ");
            throw initException;
        }

        try {
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' could not be instantiated.", e);
            throw initException;
        }

後續程式碼中會建立QuartzScheduler物件,最終來啟動調動執行緒。

            qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);

在該構造器內部,會建立QuartzSchedulerThread物件,而QuartzSchedulerThread負責對我們定義的Job進行排程。當建立完後
提交給執行器物件schedThreadExecutor.execute(this.schedThread);準備執行。

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

當sf.getScheduler();執行完畢後,執行緒池,執行器,排程器,負責排程Job的執行緒都已建立完畢。而任務排程執行緒QuartzSchedulerThread也準備執行。

可以看到sf.getScheduler();呼叫完畢後,任務排程執行緒處於暫停狀態,並不斷檢查狀態等待恢復並執行任務。

        org.quartz.core.QuartzSchedulerThread.run()

                while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

在 QuartzSchedulerThread 構造方法中會設定兩個變數初始值。

        paused = true;
        halted = new AtomicBoolean(false);

當我們獲取到排程器物件,並指定trigger與detail後,呼叫start方法。

當呼叫執行器的 org.quartz.core.QuartzScheduler.start()方法在該方法內部會呼叫方法 schedThread.togglePause(false);將paused變數設定為false,並喚醒QuartzSchedulerThread執行緒。隨QuartzSchedulerThread執行緒開始執行排程任務。

        org.quartz.core.QuartzScheduler.start()

            void togglePause(boolean pause) {
        synchronized (sigLock) {
            paused = pause;

            if (paused) {
                signalSchedulingChange(0);
            } else {
                sigLock.notifyAll();
            }
        }

至此任務排程工作準備開始,Job等待被呼叫。

QuartzSchedulerThread的run方法中檢測到變數狀態被修改,開始排程工作。

    public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }

                int availThreadCount = qsRsrcs.getThreadPool().blockForAvailableThreads();
                if(availThreadCount > 0) { // will always be true, due to semantics of blockForAvailableThreads...

                    List<OperableTrigger> triggers = null;

                    long now = System.currentTimeMillis();

                    clearSignaledSchedulingChange();
... 省略

run方法中的執行過程可粗略歸納為
1.查詢將要觸發的Trigger
2.檢測條件,等待執行
3.執行任務。
4.釋放Trigger

在執行時,任務被包裝為JobRunShell來執行。

當qsRsrcs.getThreadPool().runInThread(shell)被呼叫時,我們自己的Job被提交,並等待執行緒池排程執行。

        JobRunShell shell = null; 

                                    JobRunShell shell = null;
                            try {
                                shell = qsRsrcs.getJobRunShellFactory().createJobRunShell(bndle);
                                shell.initialize(qs);
                            } catch (SchedulerException se) {
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                                continue;
                            }

                            if (qsRsrcs.getThreadPool().runInThread(shell) == false) {
                                // this case should never happen, as it is indicative of the
                                // scheduler being shutdown or a bug in the thread pool or
                                // a thread pool being used concurrently - which the docs
                                // say not to do...
                                getLog().error("ThreadPool.runInThread() return false!");
                                qsRsrcs.getJobStore().triggeredJobComplete(triggers.get(i), bndle.getJobDetail(), CompletedExecutionInstruction.SET_ALL_JOB_TRIGGERS_ERROR);
                            }

在 qsRsrcs.getThreadPool().runInThread(shell)方法中會建立一個執行緒來執行我們定義的Job,而Job的資訊則是通過JobDetail來獲取。經過上述過程就形成了quartz 進行任務排程的基本過程。

最後總結,啟動時的三個主要方法,及工作概述如下。
Scheduler sched = sf.getScheduler();
完成了執行緒池建立,排程執行緒建立,排程執行緒初始處理暫停狀態。
sched.scheduleJob(detail, trigger);
指定JobDetail與Trigger。
sched.start();
修改狀態未,使排程執行緒開始執行。任務以JobRunShell的形式被執行。

相關推薦

quartz 2.2.x 原始碼學習 基本執行流程分析

quartz 官網中給出了一些基本概念,請先閱讀官網相關概念。 下面對最簡單的一個任務排程工作進行分析,下面的程式碼每隔三秒中不斷重複執行任務SimpleJob。 public class JobStart { public static v

mybatis原始碼學習執行過程分析2)——config.xml配置檔案和mapper.xml對映檔案解析過程

在上一篇中跟蹤了SqlSessionFactory及SqlSession的建立過程。這一篇,主要跟蹤Mapper介面和XML檔案對映及獲取。 1.xml檔案的解析 1.1Mybatis-config.xml的解析 在SqlSessionFactor

Struts 2 Spring Hibernate三大框架的執行流程以及原理

freemark 步驟 二維 ring logs spa att spring 添加 轉:http://www.cnblogs.com/System-out-println/p/5974113.html Struts2框架 一、簡介 Struts2是一個相當強大的Ja

TBSchedule原始碼學習筆記-執行緒組任務排程

根據上文的啟動過程,找到了執行緒組的實現。com.taobao.pamirs.schedule.taskmanager.TBScheduleManager /** * 1、任務排程分配器的目標: 讓所有的任務不重複,不遺漏的被快速處理。 * 2、

Java併發包原始碼學習執行緒池(一)ThreadPoolExecutor原始碼分析

Java中使用執行緒池技術一般都是使用Executors這個工廠類,它提供了非常簡單方法來建立各種型別的執行緒池: public static ExecutorService newFixedThreadPool(int nThreads) public static ExecutorService

[筆記]架構探險-從零開始寫JavaWeb框架-2.1. 之使框架具有aop特性-aop框架載入與切面執行流程分析

囉嗦一句:本筆記只是自己在學習過程中的一些分析和理解,看的人不一定能看懂.如果有興趣還是去買這本書看.筆記就當是另外一種解說好了 在本章節中會學習到如下的技術: 如何理解並使用代理技術 如何使用Spring提供的AOP技術(忽略,太多知識) 如何使

dlmalloc 2.8.6 原始碼詳解—[6]除錯分析

分析堆溢位漏洞不能僅僅停留在瞭解dlmalloc堆分配器執行原理,還需要通過偵錯程式觀察其執行過程並在堆記憶體中識別堆的資料結構,這就像實習醫生學習做手術一樣,最終檢驗還是要在手術檯上。 1 除錯程式碼 #include<stdio.h>

原始碼學習執行緒池

大家面試過程中肯定被問道過執行緒池。為什麼要使用執行緒池呢?因為在系統中頻繁建立執行緒會造成很大的CPU消耗。而且用完的執行緒要等待GC回收也會造成消耗。 下面我們就來學習下最常用的執行緒池 ThreadPoolExecutor, 首先先來看看它的構造方法: public ThreadPo

linux-2.6.22.6內核啟動流程分析之配置

linux 分享圖片 src image 比較 文件包含 子目錄 2.6 config 配置過程最終結果是生成.config文件,我們想要對配置的目的有很清楚的了解,必須先對.config文件進行分析。通過cd命令切換到linux-2.6.22.6內核目錄,輸入vi .co

spring原始碼學習之路---深度分析IOC容器初始化過程(三)

分析FileSystemXmlApplicationContext的建構函式,到底都做了什麼,導致IOC容器初始化成功。 public FileSystemXmlApplicationContext(String[] configLocations, boolean ref

K8S 原始碼探祕 之 kubeadm upgrade apply 執行流程分析

一、引言        本文將基於 Kubernetes 1.12 版本,分析 kubeadm  upgrade apply 的執行流程,希望對讀者理解 k8s 有幫助!        關

K8S 原始碼探祕 之 kubeadm join 執行流程分析

一、引言        本文將基於 Kubernetes 1.12 版本,分析 kubeadm  join 的執行流程,希望對讀者理解 k8s 有幫助!        關於 init 流程

K8S 原始碼探祕 之 kubeadm init 執行流程分析

一、引言        kubeadm 是 k8s 重要的快速部署工具,也是其原生支援的部署工具,在實現自動化部署方面具有重要的研究價值。        本文將基於 Kubernetes 1.12 版本,分析

深入淺出Mybatis---SQL執行流程分析原始碼篇)

最近太忙了,一直沒時間繼續更新部落格,今天忙裡偷閒繼續我的Mybatis學習之旅。在前九篇中,介紹了mybatis的配置以及使用, 那麼本篇將走進mybatis的原始碼,分析mybatis 的執行流程, 好啦,鄙人不喜歡口水話,還是直接上幹活吧: 1. SqlSessionFactory 與 S

Spark學習筆記(10)—— wordcount 執行流程分析

1 啟動叢集 啟動 HDFS start-dfs.sh 啟動 Spark 叢集 /home/hadoop/apps/spark-1.6.3-bin-hadoop2.6/sbin/start-all

(Android) OkHttp3.11 原始碼學習筆記 9 CallServerInterceptor分析

這個攔截器主要負責向伺服器發起真正的網路請求,並接收到response,再返回,下面為主要的intercept方法原始碼 @Override public Response intercept(Chain chain) throws IOException { RealIntercept

(Android) OkHttp3.10 原始碼學習筆記 9 ConnectInterceptor分析

首先我們還是去看它的intercept方法 @Override public Response intercept(Chain chain) throws IOException { RealInterceptorChain realChain = (RealInterceptorChai

CUDPP基本執行過程分析

CUDPPResult cudppPlan(constCUDPPHandle cudppHandle, CUDPPHandle*planHandle, CUDPPConfiguration config, size_t numElements, size_t n

地球氣候系統模式cesm的基本執行流程

最近在向師兄學習氣候模式CESM (Community Earth System Model),然而師兄似乎很忙的樣子,於是只能自己看CESM官網的使用者手冊學習:http://www.cesm.ucar.edu/models/cesm1.2/cesm/doc/usersg

i2c驅動程式(2) i2c_driver probe被呼叫的流程分析

(本文為個人的筆記 難免有錯 望各位高人賜教指正 謝謝!) i2c驅動程式i2c_driver probe被呼叫的流程分析 step1 i2c_add_driver(&at24_driver); step2 i2c_register_driver(T