Quartz與Spring集成——創建調度器

分類:IT技術 時間:2016-10-17

前言

在《Quartz與Spring集成—— SchedulerFactoryBean的初始化分析》一文中介紹過Spring集成Quartz時的初始化過程,其中簡單的提到了創建調度器的方法createScheduler。本文將著重介紹Quartz初始化時是如何創建調度器的。

創建調度器

這裏從createScheduler的實現(見代碼清單1)來分析,其處理步驟如下:

  1. 設置線程上下文的類加載器;
  2. 通過單例方法獲取SchedulerRepository的實例(見代碼清單2);
  3. 從調度倉庫實例SchedulerRepository中查找已經存在的調度器,這裏想說的是雖然此實現考慮到了多線程安全問題,不過這種方式效率較低。不如提前初始化,由final修飾,不使用同步語句,避免線程間的阻塞和等待;
  4. 獲取調取器(見代碼清單3),其實際上首先從調度器緩存中查找調度器,否則調用instantiate方法創建調度器;
  5. 檢查重新獲取的調度器和已經存在的調度器是否相同,如果相同則說明此調度器已經被激活了,將會報出異常。如果調度器是首次被激活,那麽將返回此調度器。這裏的實現稍微有些拖沓,其實只有當existingScheduler為null時,才會調用instantiate方法創建newScheduler,也只有在這個時候newScheduler才不等於existingScheduler,並且不會拋出異常。因此我們甚至可以省去Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);這行代碼,而直接將代碼清單3中的實現進行修改——當sched為null時才調用instantiate方法創建調度器。

代碼清單1

	protected Scheduler createScheduler(SchedulerFactory schedulerFactory, String schedulerName)
			throws SchedulerException {

		// Override thread context ClassLoader to work around naive Quartz ClassLoadHelper loading.
		Thread currentThread = Thread.currentThread();
		ClassLoader threadContextClassLoader = currentThread.getContextClassLoader();
		boolean overrideClassLoader = (this.resourceLoader != null &&
				!this.resourceLoader.getClassLoader().equals(threadContextClassLoader));
		if (overrideClassLoader) {
			currentThread.setContextClassLoader(this.resourceLoader.getClassLoader());
		}
		try {
			SchedulerRepository repository = SchedulerRepository.getInstance();
			synchronized (repository) {
				Scheduler existingScheduler = (schedulerName != null ? repository.lookup(schedulerName) : null);
				Scheduler newScheduler = schedulerFactory.getScheduler();
				if (newScheduler == existingScheduler) {
					throw new IllegalStateException("Active Scheduler of name '" + schedulerName + "' already registered " +
							"in Quartz SchedulerRepository. Cannot create a new Spring-managed Scheduler of the same name!");
				}
				if (!this.exposeSchedulerInRepository) {
					// Need to remove it in this case, since Quartz shares the Scheduler instance by default!
					SchedulerRepository.getInstance().remove(newScheduler.getSchedulerName());
				}
				return newScheduler;
			}
		}
		finally {
			if (overrideClassLoader) {
				// Reset original thread context ClassLoader.
				currentThread.setContextClassLoader(threadContextClassLoader);
			}
		}
	}

代碼清單2

    public static synchronized SchedulerRepository getInstance() {
        if (inst == null) {
            inst = new SchedulerRepository();
        }

        return inst;
    }

代碼清單3
    public Scheduler getScheduler() throws SchedulerException {
        if (cfg == null) {
            initialize();
        }

        SchedulerRepository schedRep = SchedulerRepository.getInstance();

        Scheduler sched = schedRep.lookup(getSchedulerName());

        if (sched != null) {
            if (sched.isShutdown()) {
                schedRep.remove(getSchedulerName());
            } else {
                return sched;
            }
        }

        sched = instantiate();

        return sched;
    }

讀取調度器配置

instantiate方法中包含了很多從PropertiesParser(PropertiesParser在《Quartz與Spring集成—— SchedulerFactoryBean的初始化分析》一文中介紹過)中獲取各種屬性的語句,這裏不過多展示。重點來看其更為本質的內容。

創建遠端調度器代理

如果當前調度器實際是代理遠程RMI調度器,那麽創建RemoteScheduler,並將當前調取器與RemoteScheduler進行綁定,最後以此RemoteScheduler作為調度器,見代碼清單4。

代碼清單4

        if (rmiProxy) {

            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            String uid = (rmiBindName == null) ? QuartzSchedulerResources.getUniqueIdentifier(
                    schedName, schedInstId) : rmiBindName;

            RemoteScheduler remoteScheduler = new RemoteScheduler(uid, rmiHost, rmiPort);

            schedRep.bind(remoteScheduler);

            return remoteScheduler;
        }

創建遠端jmx調度器代理

如果當前調度器實際是代理遠程JMX調度器,那麽創建RemoteMBeanScheduler,並將當前調度器與RemoteMBeanScheduler進行綁定,最後以此RemoteMBeanScheduler作為調度器,見代碼清單5。

代碼清單5

        if (jmxProxy) {
            if (autoId) {
                schedInstId = DEFAULT_INSTANCE_ID;
            }

            if (jmxProxyClass == null) {
                throw new SchedulerConfigException("No JMX Proxy Scheduler class provided");
            }

            RemoteMBeanScheduler jmxScheduler = null;
            try {
                jmxScheduler = (RemoteMBeanScheduler)loadHelper.loadClass(jmxProxyClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate RemoteMBeanScheduler class.", e);
            }

            if (jmxObjectName == null) {
                jmxObjectName = QuartzSchedulerResources.generateJMXObjectName(schedName, schedInstId);
            }

            jmxScheduler.setSchedulerObjectName(jmxObjectName);

            tProps = cfg.getPropertyGroup(PROP_SCHED_JMX_PROXY, true);
            try {
                setBeanProps(jmxScheduler, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("RemoteMBeanScheduler class '"
                        + jmxProxyClass + "' props could not be configured.", e);
                throw initException;
            }

            jmxScheduler.initialize();

            schedRep.bind(jmxScheduler);

            return jmxScheduler;
        }

實例化作業工廠

如果指定了jobFactoryClass屬性,那麽實例化作業工廠實例,見代碼清單6。實例化的JobFactory將用於創建調度作業。

代碼清單6

        JobFactory jobFactory = null;
        if(jobFactoryClass != null) {
            try {
                jobFactory = (JobFactory) loadHelper.loadClass(jobFactoryClass)
                        .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate JobFactory class: "
                                + e.getmessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_JOB_FACTORY_PREFIX, true);
            try {
                setBeanProps(jobFactory, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("JobFactory class '"
                        + jobFactoryClass + "' props could not be configured.", e);
                throw initException;
            }
        }

實例化實例ID生成器

如果指定了instanceIdGeneratorClass屬性,那麽實例化實例ID生成器,見代碼清單7。此生成器用來給調度器實例生成ID。

代碼清單7

        InstanceIdGenerator instanceIdGenerator = null;
        if(instanceIdGeneratorClass != null) {
            try {
                instanceIdGenerator = (InstanceIdGenerator) loadHelper.loadClass(instanceIdGeneratorClass)
                    .newInstance();
            } catch (Exception e) {
                throw new SchedulerConfigException(
                        "Unable to instantiate InstanceIdGenerator class: "
                        + e.getMessage(), e);
            }

            tProps = cfg.getPropertyGroup(PROP_SCHED_INSTANCE_ID_GENERATOR_PREFIX, true);
            try {
                setBeanProps(instanceIdGenerator, tProps);
            } catch (Exception e) {
                initException = new SchedulerException("InstanceIdGenerator class '"
                        + instanceIdGeneratorClass + "' props could not be configured.", e);
                throw initException;
            }
        }

實例化線程池

org.quartz.threadPool.class屬性用於指定線程池類,如果沒有指定,則默認為org.quartz.simpl.SimpleThreadPool,見代碼清單8。此線程池將用於shell作業的執行。

代碼清單8

        String tpClass = cfg.getStringProperty(PROP_THREAD_POOL_CLASS, SimpleThreadPool.class.getName());

        if (tpClass == null) {
            initException = new SchedulerException(
                    "ThreadPool class not specified. ");
            throw initException;
        }

        try {
            tp = (ThreadPool) loadHelper.loadClass(tpClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' could not be instantiated.", e);
            throw initException;
        }
        tProps = cfg.getPropertyGroup(PROP_THREAD_POOL_PREFIX, true);
        try {
            setBeanProps(tp, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("ThreadPool class '"
                    + tpClass + "' props could not be configured.", e);
            throw initException;
        }

實例化JobStore的具體實例

org.quartz.jobStore.class屬性用於指定JobStore的具體類型,我顯示指定了org.springframework.scheduling.quartz.LocalDataSourceJobStore,如果沒有指定,則默認為RAMJobStore,見代碼清單9。jobStore顧名思義,就是作業的存儲,以LocalDataSourceJobStore為例,將通過它對觸發器、作業等內容進行增刪改查。

代碼清單9

        String jsClass = cfg.getStringProperty(PROP_JOB_STORE_CLASS,
                RAMJobStore.class.getName());

        if (jsClass == null) {
            initException = new SchedulerException(
                    "JobStore class not specified. ");
            throw initException;
        }

        try {
            js = (JobStore) loadHelper.loadClass(jsClass).newInstance();
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' could not be instantiated.", e);
            throw initException;
        }

        SchedulerDetailsSetter.setDetails(js, schedName, schedInstId);

        tProps = cfg.getPropertyGroup(PROP_JOB_STORE_PREFIX, true, new String[] {PROP_JOB_STORE_LOCK_HANDLER_PREFIX});
        try {
            setBeanProps(js, tProps);
        } catch (Exception e) {
            initException = new SchedulerException("JobStore class '" + jsClass
                    + "' props could not be configured.", e);
            throw initException;
        }

獲取數據庫管理器並設置數據庫連接池

這一步驟的執行邏輯比較多,但是仔細整理後發現數據庫管理器都一樣,無非是數據連接池的提供者不同(見代碼清單10),一共分為三種:

方式一:連接池提供者由connectionProvider.class屬性指定;

方式二:連接池提供者由jndiURL屬性指定;

方式三:連接池提供者為PoolingConnectionProvider,其使用了C3P0連接池;

代碼清單10

        String[] dsNames = cfg.getPropertyGroups(PROP_DATASOURCE_PREFIX);
        for (int i = 0; i < dsNames.length; i++) {
            PropertiesParser pp = new PropertiesParser(cfg.getPropertyGroup(
                    PROP_DATASOURCE_PREFIX + "." + dsNames[i], true));

            String cpClass = pp.getStringProperty(PROP_CONNECTION_PROVIDER_CLASS, null);

            // custom connectionProvider...
            if(cpClass != null) {
                ConnectionProvider cp = null;
                try {
                    cp = (ConnectionProvider) loadHelper.loadClass(cpClass).newInstance();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' could not be instantiated.", e);
                    throw initException;
                }

                try {
                    // remove the class name, so it isn't attempted to be set
                    pp.getUnderlyingProperties().remove(
                            PROP_CONNECTION_PROVIDER_CLASS);

                    if (cp instanceof PoolingConnectionProvider) {
                        populateProviderWithExtraProps((PoolingConnectionProvider)cp, pp.getUnderlyingProperties());
                    } else {
                        setBeanProps(cp, pp.getUnderlyingProperties());
                    }
                    cp.initialize();
                } catch (Exception e) {
                    initException = new SchedulerException("ConnectionProvider class '" + cpClass
                            + "' props could not be configured.", e);
                    throw initException;
                }

                dbMgr = DBConnectionManager.getInstance();
                dbMgr.addConnectionProvider(dsNames[i], cp);
            } else {
                String dsJndi = pp.getStringProperty(PROP_DATASOURCE_JNDI_URL, null);

                if (dsJndi != null) {
                    boolean dsAlwaysLookup = pp.getBooleanProperty(
                            PROP_DATASOURCE_JNDI_ALWAYS_LOOKUP);
                    String dsJndiInitial = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_INITIAL);
                    String dsJndiProvider = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PROVDER);
                    String dsJndiPrincipal = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_PRINCIPAL);
                    String dsJndiCredentials = pp.getStringProperty(
                            PROP_DATASOURCE_JNDI_CREDENTIALS);
                    Properties props = null;
                    if (null != dsJndiInitial || null != dsJndiProvider
                            || null != dsJndiPrincipal || null != dsJndiCredentials) {
                        props = new Properties();
                        if (dsJndiInitial != null) {
                            props.put(PROP_DATASOURCE_JNDI_INITIAL,
                                    dsJndiInitial);
                        }
                        if (dsJndiProvider != null) {
                            props.put(PROP_DATASOURCE_JNDI_PROVDER,
                                    dsJndiProvider);
                        }
                        if (dsJndiPrincipal != null) {
                            props.put(PROP_DATASOURCE_JNDI_PRINCIPAL,
                                    dsJndiPrincipal);
                        }
                        if (dsJndiCredentials != null) {
                            props.put(PROP_DATASOURCE_JNDI_CREDENTIALS,
                                    dsJndiCredentials);
                        }
                    }
                    JNDIConnectionProvider cp = new JNDIConnectionProvider(dsJndi,
                            props, dsAlwaysLookup);
                    dbMgr = DBConnectionManager.getInstance();
                    dbMgr.addConnectionProvider(dsNames[i], cp);
                } else {
                    String dsDriver = pp.getStringProperty(PoolingConnectionProvider.DB_DRIVER);
                    String dsURL = pp.getStringProperty(PoolingConnectionProvider.DB_URL);

                    if (dsDriver == null) {
                        initException = new SchedulerException(
                                "Driver not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    if (dsURL == null) {
                        initException = new SchedulerException(
                                "DB URL not specified for DataSource: "
                                        + dsNames[i]);
                        throw initException;
                    }
                    try {
                        PoolingConnectionProvider cp = new PoolingConnectionProvider(pp.getUnderlyingProperties());
                        dbMgr = DBConnectionManager.getInstance();
                        dbMgr.addConnectionProvider(dsNames[i], cp);

                        // Populate the underlying C3P0 data source pool properties
                        populateProviderWithExtraProps(cp, pp.getUnderlyingProperties());
                    } catch (Exception sqle) {
                        initException = new SchedulerException(
                                "Could not initialize DataSource: " + dsNames[i],
                                sqle);
                        throw initException;
                    }
                }

            }

        }

設置調度器插件

這一段用於設置各種調度器插件,見代碼清單11。這裏的PROP_PLUGIN_PREFIX的值為org.quartz.plugin,即可以在Quartz的屬性文件中配置一系列以org.quartz.plugin為前綴的插件,例如可以在關閉JVM時,添加鉤子做一些清理工作的插件org.quartz.plugins.management.ShutdownHookPlugin。

代碼清單11

        String[] pluginNames = cfg.getPropertyGroups(PROP_PLUGIN_PREFIX);
        SchedulerPlugin[] plugins = new SchedulerPlugin[pluginNames.length];
        for (int i = 0; i < pluginNames.length; i++) {
            Properties pp = cfg.getPropertyGroup(PROP_PLUGIN_PREFIX + "."
                    + pluginNames[i], true);

            String plugInClass = pp.getProperty(PROP_PLUGIN_CLASS, null);

            if (plugInClass == null) {
                initException = new SchedulerException(
                        "SchedulerPlugin class not specified for plugin '"
                                + pluginNames[i] + "'");
                throw initException;
            }
            SchedulerPlugin plugin = null;
            try {
                plugin = (SchedulerPlugin)
                        loadHelper.loadClass(plugInClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "SchedulerPlugin class '" + plugInClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                setBeanProps(plugin, pp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobStore SchedulerPlugin '" + plugInClass
                                + "' props could not be configured.", e);
                throw initException;
            }

            plugins[i] = plugin;
        }

設置作業監聽器

這一步用於設置作業監聽器,我覺得可以用於做一些作業監控相關的擴展,見代明清單12。這裏的常量PROP_JOB_LISTENER_PREFIX的值為org.quartz.jobListener。我們可以在Quartz屬性文件添加以org.quartz.jobListener為前綴的作業監聽器。

代明清單12

        Class<?>[] strArg = new Class[] { String.class };
        String[] jobListenerNames = cfg.getPropertyGroups(PROP_JOB_LISTENER_PREFIX);
        JobListener[] jobListeners = new JobListener[jobListenerNames.length];
        for (int i = 0; i < jobListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_JOB_LISTENER_PREFIX + "."
                    + jobListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "JobListener class not specified for listener '"
                                + jobListenerNames[i] + "'");
                throw initException;
            }
            JobListener listener = null;
            try {
                listener = (JobListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener class '" + listenerClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try { 
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) { 
                    /* do nothing */ 
                }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {jobListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "JobListener '" + listenerClass
                                + "' props could not be configured.", e);
                throw initException;
            }
            jobListeners[i] = listener;
        }

設置觸發器監聽器

這一步設置觸發器監聽器,見代碼清單13。與作業監聽器類似,不再贅述。

代碼清單13

        String[] triggerListenerNames = cfg.getPropertyGroups(PROP_TRIGGER_LISTENER_PREFIX);
        TriggerListener[] triggerListeners = new TriggerListener[triggerListenerNames.length];
        for (int i = 0; i < triggerListenerNames.length; i++) {
            Properties lp = cfg.getPropertyGroup(PROP_TRIGGER_LISTENER_PREFIX + "."
                    + triggerListenerNames[i], true);

            String listenerClass = lp.getProperty(PROP_LISTENER_CLASS, null);

            if (listenerClass == null) {
                initException = new SchedulerException(
                        "TriggerListener class not specified for listener '"
                                + triggerListenerNames[i] + "'");
                throw initException;
            }
            TriggerListener listener = null;
            try {
                listener = (TriggerListener)
                       loadHelper.loadClass(listenerClass).newInstance();
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener class '" + listenerClass
                                + "' could not be instantiated.", e);
                throw initException;
            }
            try {
                Method nameSetter = null;
                try { 
                    nameSetter = listener.getClass().getMethod("setName", strArg);
                }
                catch(NoSuchMethodException ignore) { /* do nothing */ }
                if(nameSetter != null) {
                    nameSetter.invoke(listener, new Object[] {triggerListenerNames[i] } );
                }
                setBeanProps(listener, lp);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "TriggerListener '" + listenerClass
                                + "' props could not be configured.", e);
                throw initException;
            }
            triggerListeners[i] = listener;
        }

獲取線程執行器

可以通過屬性org.quartz.threadExecutor.class設置線程執行器,如果沒有指定,默認為DefaultThreadExecutor,見代碼清單13。此線程執行器用於執行定時調度線程QuartzSchedulerThread(有關QuartzSchedulerThread的執行過程將會在單獨的博文中展開)。

代碼清單13

        String threadExecutorClass = cfg.getStringProperty(PROP_THREAD_EXECUTOR_CLASS);
        if (threadExecutorClass != null) {
            tProps = cfg.getPropertyGroup(PROP_THREAD_EXECUTOR, true);
            try {
                threadExecutor = (ThreadExecutor) loadHelper.loadClass(threadExecutorClass).newInstance();
                log.info("Using custom implementation for ThreadExecutor: " + threadExecutorClass);

                setBeanProps(threadExecutor, tProps);
            } catch (Exception e) {
                initException = new SchedulerException(
                        "ThreadExecutor class '" + threadExecutorClass + "' could not be instantiated.", e);
                throw initException;
            }
        } else {
            log.info("Using default implementation for ThreadExecutor");
            threadExecutor = new DefaultThreadExecutor();
        }

創建腳本執行工廠

如果需要作業運行在事務中(可以通過屬性org.quartz.scheduler.wrapJobExecutionInUserTransaction指定),則創建JTAJobRunShellFactory,否則創建JTAAnnotationAwareJobRunShellFactory,見代碼清單14。JobRunShellFactory將用於生成作業的shell執行對象JobRunShell。

代碼清單14

            JobRunShellFactory jrsf = null; // Create correct run-shell factory...
    
            if (userTXLocation != null) {
                UserTransactionHelper.setUserTxLocation(userTXLocation);
            }
    
            if (wrapJobInTx) {
                jrsf = new JTAJobRunShellFactory();
            } else {
                jrsf = new JTAAnnotationAwareJobRunShellFactory();
            }

生成調度實例ID

如果需要自動生成調度實例ID(可以通過屬性org.quartz.scheduler.instanceId為AUTO或者SYS_PROP,其中當指定為AUTO時,則instanceIdGeneratorClass由org.quartz.scheduler.instanceIdGenerator.class屬性指定,默認為org.quartz.simpl.SimpleInstanceIdGenerator;當指定為SYS_PROP,則instanceIdGeneratorClass等於org.quartz.simpl.systemPropertyInstanceIdGenerator),那麽調度實例ID為NON_CLUSTERED,當JobStore支持集群部署,那麽調度實例ID將由調度實例ID生成器instanceIdGenerator產生,見代碼清單15。(註:當不需要自動生成調度實例ID時,可以通過屬性org.quartz.scheduler.instanceId指定)

代碼清單15

            if (autoId) {
                try {
                  schedInstId = DEFAULT_INSTANCE_ID;
                  if (js.isClustered()) {
                      schedInstId = instanceIdGenerator.generateInstanceId();
                  }
                } catch (Exception e) {
                    getLog().error("Couldn't generate instance Id!", e);
                    throw new IllegalStateException("Cannot run without an instance id.");
                }
            }

設置JobStore的數據庫錯誤重試的間隔及現場執行器

JobStoreSupport是JobStore的抽象實現類,只有繼承自JobStoreSupport的具體實現類(例如org.springframework.scheduling.quartz.LocalDataSourceJobStore)才可以通過調用其setDbRetryInterval方法設置數據庫錯誤重試間隔(dbFailureRetry屬性默認為15000,也可以通過設置org.quartz.scheduler.dbFailureRetryInterval屬性進行指定),setThreadExecutor方法用於設置JobStoreSupport的線程執行器,見代碼清單16。

代碼清單16

            if (js instanceof JobStoreSupport) {
                JobStoreSupport jjs = (JobStoreSupport)js;
                jjs.setDbRetryInterval(dbFailureRetry);
                if(threadsInheritInitalizersClassLoader)
                    jjs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
                
                jjs.setThreadExecutor(threadExecutor);
            }

構造QuartzSchedulerResources

QuartzSchedulerResources用於持有定時調度需要的各種資源,如作業運行腳本的工廠、執行QuartzSchedulerThread的線程執行器、執行具體shell作業的線程池、各種插件、監聽器等。在構造QuartzSchedulerResources的過程中(見代碼清單17),設置了很多屬性,現在列舉如下:

屬性名稱含義備註
name調度名稱可以由org.quartz.scheduler.instanceName屬性指定
threadName調度線程名稱可以由org.quartz.scheduler.threadName屬性指定,默認等於調度名稱加後綴_QuartzSchedulerThread產生
instanceId調度實例ID可以由org.quartz.scheduler.instanceId屬性指定,具體生成規則見文中描述
jobRunShellFactory創建作業運行腳本工廠可以由org.quartz.scheduler.wrapJobExecutionInUserTransaction屬性指定,具體實現有JTAJobRunShellFactory和JTAAnnotationAwareJobRunShellFactory兩種
makeSchedulerThreadDaemon調度線程是否是後臺線程可以由org.quartz.scheduler.makeSchedulerThreadDaemon屬性指定
threadsInheritInitalizersClassLoader線程是否繼承初始化的類加載器可以由org.quartz.scheduler.threadsInheritContextClassLoaderOfInitializer屬性指定
runupdateCheck運行時是否檢查Quartz的可用更新版本可以由org.quartz.scheduler.skipUpdateCheck屬性指定,runUpdateCheck與指定值相反
batchTimeWindow在時間窗口前批量觸發可以由org.quartz.scheduler.batchTriggerAcquisitionFireAheadTimeWindow屬性指定
maxBatchSize最大批量執行的作業數可以由org.quartz.scheduler.batchTriggerAcquisitionMaxCount屬性指定
interruptJobsOnShutdown當關閉作業時,中斷作業線程可以由org.quartz.scheduler.interruptJobsOnShutdown屬性指定
interruptJobsOnShutdownWithWait當關閉作業時,等待中斷作業線程可以由org.quartz.scheduler.interruptJobsOnShutdownWithWait屬性指定
threadExecutor線程執行器可以由org.quartz.threadExecutor.class屬性指定,默認為DefaultThreadExecutor
threadPool線程池可以由org.quartz.threadPool.class屬性指定,默認為SimpleThreadPool
jobStore作業存儲可以由org.quartz.jobStore.class屬性指定,默認為RAMJobStore


代碼清單17

            QuartzSchedulerResources rsrcs = new QuartzSchedulerResources();
            rsrcs.setName(schedName);
            rsrcs.setThreadName(threadName);
            rsrcs.setInstanceId(schedInstId);
            rsrcs.setJobRunShellFactory(jrsf);
            rsrcs.setMakeSchedulerThreadDaemon(makeSchedulerThreadDaemon);
            rsrcs.setThreadsInheritInitializersClassLoadContext(threadsInheritInitalizersClassLoader);
            rsrcs.setRunUpdateCheck(!skipUpdateCheck);
            rsrcs.setBatchTimeWindow(batchTimeWindow);
            rsrcs.setMaxBatchSize(maxBatchSize);
            rsrcs.setInterruptJobsOnShutdown(interruptJobsOnShutdown);
            rsrcs.setInterruptJobsOnShutdownWithWait(interruptJobsOnShutdownWithWait);
            rsrcs.setJMXExport(jmxExport);
            rsrcs.setJMXObjectName(jmxObjectName);

            if (managementRESTServiceEnabled) {
                ManagementRESTServiceConfiguration managementRESTServiceConfiguration = new ManagementRESTServiceConfiguration();
                managementRESTServiceConfiguration.setBind(managementRESTServiceHostAndPort);
                managementRESTServiceConfiguration.setEnabled(managementRESTServiceEnabled);
                rsrcs.setManagementRESTServiceConfiguration(managementRESTServiceConfiguration);
            }
    
            if (rmiExport) {
                rsrcs.setRMIRegistryHost(rmiHost);
                rsrcs.setRMIRegistryPort(rmiPort);
                rsrcs.setRMIServerPort(rmiServerPort);
                rsrcs.setRMICreateRegistryStrategy(rmiCreateRegistry);
                rsrcs.setRMIBindName(rmiBindName);
            }
    
            SchedulerDetailsSetter.setDetails(tp, schedName, schedInstId);

            rsrcs.setThreadExecutor(threadExecutor);
            threadExecutor.initialize();

            rsrcs.setThreadPool(tp);
            if(tp instanceof SimpleThreadPool) {
                if(threadsInheritInitalizersClassLoader)
                    ((SimpleThreadPool)tp).setThreadsInheritContextClassLoaderOfInitializingThread(threadsInheritInitalizersClassLoader);
            }
            tp.initialize();
            tpInited = true;
    
            rsrcs.setJobStore(js);
    
            // add plugins
            for (int i = 0; i < plugins.length; i++) {
                rsrcs.addSchedulerPlugin(plugins[i]);
            }

構造QuartzScheduler

構造QuartzScheduler的代碼如下:

            qs = new QuartzScheduler(rsrcs, idleWaitTime, dbFailureRetry);
            qsInited = true;

QuartzScheduler的構造器實現見代碼清單18,其處理步驟如下:
  1. 設置QuartzSchedulerResources;
  2. QuartzSchedulerResources設置的JobStore如果實現了JobListener接口,那麽將其作為作業監聽器添加到監聽器列表;
  3. 構造線程QuartzSchedulerThread實例;
  4. 從QuartzSchedulerResources中獲取設置的線程執行器;
  5. 啟動QuartzSchedulerThread;
  6. 創建執行作業管理器ExecutingJobsManager,由於其實現了JobListener,所以加入了內置的作業監聽器中;
  7. 創建錯誤日誌組件ErrorLogger,由於繼承了SchedulerListenerSupport,所以加入了內置的調度監聽器中;
  8. 構造SchedulerSignalerImpl,此組件的作業包括:向QuartzScheduler中註冊的觸發器監聽器發送觸發器失常或者觸發器再也不會被觸發的信號、修改觸發器下次觸發的時間、向QuartzScheduler中註冊的調度監聽器發送作業被刪除或者調度異常的信號;
  9. 當shouldRunUpdateCheck為true是則調用scheduleUpdateCheck方法(見代明清單19),實際是利用定時器定時執行UpdateChecker任務,此任務用於檢查Quartz的可用的更新版本;為了提高性能,可以將屬性org.quartz.scheduler.skipUpdateCheck設置為true;

代碼清單18

    public QuartzScheduler(QuartzSchedulerResources resources, long idleWaitTime, @Deprecated long dbRetryInterval)
        throws SchedulerException {
        this.resources = resources;
        if (resources.getJobStore() instanceof JobListener) {
            addInternalJobListener((JobListener)resources.getJobStore());
        }

        this.schedThread = new QuartzSchedulerThread(this, resources);
        ThreadExecutor schedThreadExecutor = resources.getThreadExecutor();
        schedThreadExecutor.execute(this.schedThread);
        if (idleWaitTime > 0) {
            this.schedThread.setIdleWaitTime(idleWaitTime);
        }

        jobMgr = new ExecutingJobsManager();
        addInternalJobListener(jobMgr);
        errLogger = new ErrorLogger();
        addInternalSchedulerListener(errLogger);

        signaler = new SchedulerSignalerImpl(this, this.schedThread);
        
        if(shouldRunUpdateCheck()) 
            updateTimer = scheduleUpdateCheck();
        else
            updateTimer = null;
        
        getLog().info("Quartz Scheduler v." + getversion() + " created.");
    }

   代碼清單19

    private Timer scheduleUpdateCheck() {
        Timer rval = new Timer(true);
        rval.scheduleAtFixedRate(new UpdateChecker(), 1000, 7 * 24 * 60 * 60 * 1000L);
        return rval;
    }

構造QuartzSchedulerThread

這裏再詳細分析下QuartzSchedulerThread的構造過程,其構造器見代碼清單20。

代碼清單20

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs) {
        this(qs, qsRsrcs, qsRsrcs.getMakeSchedulerThreadDaemon(), Thread.NORM_PRIORITY);
    }
QuartzSchedulerThread的構造器又代理了另一個構造器,見代碼清單21。

代碼清單21

    QuartzSchedulerThread(QuartzScheduler qs, QuartzSchedulerResources qsRsrcs, boolean setDaemon, int threadPrio) {
        super(qs.getSchedulerThreadGroup(), qsRsrcs.getThreadName());
        this.qs = qs;
        this.qsRsrcs = qsRsrcs;
        this.setDaemon(setDaemon);
        if(qsRsrcs.isThreadsInheritInitializersClassLoadContext()) {
            log.info("QuartzSchedulerThread Inheriting ContextClassLoader of thread: " + Thread.currentThread().getName());
            this.setContextClassLoader(Thread.currentThread().getContextClassLoader());
        }

        this.setPriority(threadPrio);

        // start the underlying thread, but put this object into the 'paused'
        // state
        // so processing doesn't start yet...
        paused = true;
        halted = new AtomicBoolean(false);
    }
代碼清單21比較簡單,QuartzScheduler的getSchedulerThreadGroup方法用於創建線程組,QuartzSchedulerResources的isThreadsInheritInitializersClassLoadContext方法實際獲取QuartzSchedulerResources的屬性threadsInheritInitializersClassLoadContext,此屬性如果為真,則設置QuartzSchedulerThread的線程上下文類加載器為當前線程的類加載器,設置paused標誌為true,以便於QuartzSchedulerThread線程不能開始處理。halted可以解釋為叫停當前線程的執行。

阻止QuartzSchedulerThread的執行

由於在構造QuartzScheduler的過程中已經啟動了QuartzSchedulerThread,那麽勢必導致此線程的執行,其run方法的部分代碼見代碼清單22.

代碼清單22

    public void run() {
        boolean lastAcquireFailed = false;

        while (!halted.get()) {
            try {
                // check if we're supposed to pause...
                synchronized (sigLock) {
                    while (paused && !halted.get()) {
                        try {
                            // wait until togglePause(false) is called...
                            sigLock.wait(1000L);
                        } catch (InterruptedException ignore) {
                        }
                    }

                    if (halted.get()) {
                        break;
                    }
                }
我們並未叫停調度線程的執行,所以halted屬性等於false,對於paused標誌而言,這裏涉及多線程安全問題,所以這裏使用了同步塊,但是實際上可以通過調整代碼將paused用volatile修飾,這樣通過內存可見性省去同步,能夠提高性能。由於paused標誌在線程剛開始執行時為false,那麽這裏的white循環將不斷輪詢,每次循環線程wait一秒。既然QuartzSchedulerThread已經開始執行,但是卻又無法執行,豈不是自相矛盾?雖然QuartzSchedulerThread線程開始啟動,但是QuartzScheduler並未準備好這一切,必須等到QuartzScheduler準備時將paused修改為false。雖說這樣實現也是可以的,但是在QuartzScheduler準備好的這段時間內,QuartzSchedulerThread線程頻繁的睡眠、被喚醒,線程上下文來回切換,耗費了一些性能。何不等到QuartzScheduler準備好時再啟動QuartzSchedulerThread線程呢?

創建調度器

創建調度器的代碼如下:

            // Create Scheduler ref...
            Scheduler scheduler = instantiate(rsrcs, qs);

這裏創建調度器時以,實際是用StdScheduler將之前創建的QuartzScheduler進行了封裝,代碼如下:

    protected Scheduler instantiate(QuartzSchedulerResources rsrcs, QuartzScheduler qs) {

        Scheduler scheduler = new StdScheduler(qs);
        return scheduler;
    }

其他處理

剩余的工作包括:設置作業工廠,對插件初始化,給QuartzScheduler的監聽器管理器註冊作業監聽器和觸發器監聽器,設置調度器上下文屬性,觸發JobStore,觸發腳本運行工廠,將調度器註冊到SchedulerRepository等,見代碼清單23。

代碼清單23

    
            // set job factory if specified
            if(jobFactory != null) {
                qs.setJobFactory(jobFactory);
            }
    
            // Initialize plugins now that we have a Scheduler instance.
            for (int i = 0; i < plugins.length; i++) {
                plugins[i].initialize(pluginNames[i], scheduler, loadHelper);
            }
    
            // add listeners
            for (int i = 0; i < jobListeners.length; i++) {
                qs.getListenerManager().addJobListener(jobListeners[i], EverythingMatcher.allJobs());
            }
            for (int i = 0; i < triggerListeners.length; i++) {
                qs.getListenerManager().addTriggerListener(triggerListeners[i], EverythingMatcher.allTriggers());
            }
    
            // set scheduler context data...
            for(Object key: schedCtxtProps.keySet()) {
                String val = schedCtxtProps.getProperty((String) key);    
                scheduler.getContext().put((String)key, val);
            }
    
            // fire up job store, and runshell factory
    
            js.setInstanceId(schedInstId);
            js.setInstanceName(schedName);
            js.setThreadPoolSize(tp.getPoolSize());
            js.initialize(loadHelper, qs.getSchedulerSignaler());

            jrsf.initialize(scheduler);
            
            qs.initialize();
    
            getLog().info(
                    "Quartz scheduler '" + scheduler.getSchedulerName()
                            + "' initialized from " + propSrc);
    
            getLog().info("Quartz scheduler version: " + qs.getVersion());
    
            // prevents the repository from being garbage collected
            qs.addNoGCObject(schedRep);
            // prevents the db manager from being garbage collected
            if (dbMgr != null) {
                qs.addNoGCObject(dbMgr);
            }
    
            schedRep.bind(scheduler);
            return scheduler;

總結

可以看到創建調度器的過程,幾乎完全是順序編程,步驟也十分清楚。但是可以看到其中可以優化的地方也比較多,另外代碼組織稍微不太合理,例如instantiate方法的長度1355-579=776行。創建完調度器還應該考慮如何啟動它,請接著看《Quartz與Spring集成——啟動調度器》一文。



後記:個人總結整理的《深入理解Spark:核心思想與源碼分析》一書現在已經正式出版上市,目前京東、當當、天貓等網站均有銷售,歡迎感興趣的同學購買。


京東:http://item.jd.com/11846120.html 

當當:http://product.dangdang.com/23838168.html 



Tags: 上下文 多線程 null 倉庫 如何

文章來源:


ads
ads

相關文章
ads

相關文章

ad