1. 程式人生 > >quartz在叢集環境下的最終解決方案

quartz在叢集環境下的最終解決方案

      最近專案中使用了spring+Quartz定時任務、但是專案最近要叢集部署、多個APP下如何利用Quartz 協調處理任務。
      大家可以思考一下、現在有 A、B、C三個應用同時作為叢集伺服器對外統一提供服務、每個應用下各有一個Quartz、它們會按照既定的時間自動執行各自的任務。我們先不說實現什麼功能,就說這樣的架構其實有點像多執行緒。那多執行緒裡就會存在“資源競爭”的問題,即可能產生髒讀,髒寫,由於三臺 應用 裡都有 Quartz,因此會存在重複處理 任務 的現象。
      解決方案一:只在一臺 應用 上裝 Quartz,其它兩臺不裝,這樣叢集就沒有意義了。
      解決方案二:使用其實Quartz自身可以例項化資料庫的特性就可以解決問題

本方案優點:
1.     每臺作為叢集點的 應用上都可以佈署 Quartz ;
2.     Quartz 的 TASK ( 12 張表)例項化如資料庫,基於資料庫引擎及 High-Available 的策略(叢集的一種策略)自動協調每個節點的 QUARTZ ,當任一一節點的 QUARTZ 非正常關閉或出錯時,另幾個節點的 QUARTZ 會自動啟動;
3.    無需開發人員更改原已經實現的 QUARTZ ,使用 SPRING+ 類反射的機制對原有程式作切面重構;
解決方案:
   1:去官網下載最新的 quartz 解壓 在目錄 docs\dbTables 下就會找到 tables_mysql.sql  檔案、建立資料庫Quartz 並匯入資料庫。

 
  2:生成 quartz.properties 檔案,把它放在工程的 src 目錄下 修改配置檔案如下:
  1. #==============================================================    
  2. #Configure Main Scheduler Properties    
  3. #==============================================================     
  4. org.quartz.scheduler.instanceName = quartzScheduler  
  5. org.quartz.scheduler.instanceId = AUTO  
  6. #==============================================================    
  7. #Configure ThreadPool    
  8. #==============================================================   
  9. org.quartz.threadPool.class = org.quartz.simpl.SimpleThreadPool  
  10. org.quartz.threadPool.threadCount = 10  
  11. org.quartz.threadPool.threadPriority = 5  
  12. org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread = true 
  13. #==============================================================    
  14. #Configure JobStore    
  15. #==============================================================   
  16. org.quartz.jobStore.class = org.quartz.impl.jdbcjobstore.JobStoreTX  
  17. org.quartz.jobStore.driverDelegateClass = org.quartz.impl.jdbcjobstore.StdJDBCDelegate  
  18. org.quartz.jobStore.tablePrefix = QRTZ_  
  19. org.quartz.jobStore.isClustered = true  
  20. org.quartz.jobStore.clusterCheckinInterval = 20000    
  21. org.quartz.jobStore.dataSource = myDS  
  22. #==============================================================    
  23. #Configure DataSource     (此處填你自己的資料庫連線資訊)
  24. #==============================================================   
  25. org.quartz.dataSource.myDS.driver = com.mysql.jdbc.Driver  
  26. org.quartz.dataSource.myDS.URL = jdbc\:mysql\://localhost\:3306/quartz?useUnicode\=true&characterEncoding\=UTF-8  
  27. org.quartz.dataSource.myDS.user = root  
  28. org.quartz.dataSource.myDS.password = 123  
  29. org.quartz.dataSource.myDS.maxConnections =30  
複製程式碼 3:重寫 quartz 的 QuartzJobBean 類 
原因是在使用 quartz+spring 把 quartz 的 task 例項化進入資料庫時,會產生: serializable 的錯誤,原因在於:
這個 MethodInvokingJobDetailFactoryBean 類中的 methodInvoking 方法,是不支援序列化的,因此在把 QUARTZ 的 TASK 序列化進入資料庫時就會拋錯。網上有說把 SPRING 原始碼拿來,修改一下這個方案,然後再打包成 SPRING.jar 釋出,這些都是不好的方法,是不安全的。
必須根據 QuartzJobBean 來重寫一個自己的類 。
BootstrapJob.java: 引導Job,通過Spring容器獲取任務的Job,根據注入的targetJob,該Job必須實現Job2介面
  1. /**
  2. * 引導Job,通過Spring容器獲取任務的Job,根據注入的targetJob,該Job必須實現Job2介面
  3. * @author zzp
  4. * @date 2014-7-7
  5. */
  6. public class BootstrapJob implements Serializable{
  7.         private String targetJob ; 
  8.         public void executeInternal(ApplicationContext cxt) {
  9.                 Job2 job = (Job2)cxt.getBean(this.targetJob);
  10.                 job.executeInternal() ;
  11.         }
  12.         public String getTargetJob() {
  13.                 return targetJob;
  14.         }
  15.         public void setTargetJob(String targetJob) {
  16.                 this.targetJob = targetJob;
  17.         }
  18. }
複製程式碼 Job2.java:

  1. /**
  2. * Quartz 與 Spring 整合時,自定義的Job可以擁有Spring的上下文,
  3. * 因此定義了該介面,自定義的Job需要實現該介面,並實現executeInternal的task,
  4. * 這樣解決了Quartz 與Spring 在叢集環境下,可以不需要序列化,
  5. * 只需要在executeInternal獲取Spring 上下文中的target job bean.
  6. * 呼叫其相關的處理函式,來處理任務
  7. * @author zzp
  8. * @date 2014-7-7
  9. */
  10. public interface Job2 extends Serializable{
  11.         /**
  12.          * 處理任務的核心函式
  13.          * 
  14.          * @param cxt Spring 上下文
  15.          */
  16.         void executeInternal();
  17. }
複製程式碼 重寫 MethodInvokingJobDetailFactoryBean類 方法如下:


  1. <p>public void execute(JobExecutionContext context) throws JobExecutionException
  2.                 {
  3.                         try
  4.                         {
  5.                                 logger.debug("start");
  6.                                 String targetClass = context.getMergedJobDataMap().getString("targetClass");
  7.                                 //logger.debug("targetClass is "+targetClass);
  8.                                 Class targetClassClass = null;
  9.                                 if(targetClass!=null)
  10.                                 {
  11.                                         targetClassClass = Class.forName(targetClass); // Could throw ClassNotFoundException
  12.                                 }
  13.                                 Object targetObject = context.getMergedJobDataMap().get("targetObject");
  14.                                 if(targetObject instanceof BootstrapJob){
  15.                                         //Job2 job = (Job2)targetObject;
  16.                                         //job.executeInternal(context.getScheduler().getContext().)
  17.                                         ApplicationContext ac = (ApplicationContext)context.getScheduler().getContext().get("applicationContext");
  18.                                         BootstrapJob target = (BootstrapJob)targetObject ;
  19.                                         target.executeInternal(ac);
  20.                                 }else{
  21.                                         //logger.debug("targetObject is "+targetObject);
  22.                                         String targetMethod = context.getMergedJobDataMap().getString("targetMethod");
  23.                                         //logger.debug("targetMethod is "+targetMethod);
  24.                                         String staticMethod = context.getMergedJobDataMap().getString("staticMethod");
  25.                                         //logger.debug("staticMethod is "+staticMethod);
  26.                                         Object[] arguments = (Object[])context.getMergedJobDataMap().get("arguments");
  27.                                         //logger.debug("arguments are "+arguments);
  28.                                         //logger.debug("creating MethodInvoker");
  29.                                         MethodInvoker methodInvoker = new MethodInvoker();
  30.                                         methodInvoker.setTargetClass(targetClassClass);
  31.                                         methodInvoker.setTargetObject(targetObject);
  32.                                         methodInvoker.setTargetMethod(targetMethod);
  33.                                         methodInvoker.setStaticMethod(staticMethod);
  34.                                         methodInvoker.setArguments(arguments);
  35.                                         methodInvoker.prepare();
  36.                                         //logger.info("Invoking: "+methodInvoker.getPreparedMethod().toGenericString());
  37.                                         methodInvoker.invoke();
  38.                                 }
  39.                         }
  40.                         catch(Exception e)
  41.                         {
  42.                                 throw new JobExecutionException(e);
  43.                         }
  44.                         finally
  45.                         {
  46.                                 logger.debug("end");
  47.                         }
  48.                 }
  49.         }</p><p>
  50. </p>
複製程式碼 QuartzDeleteQueAction 任務類、一定要實現介面job2、只做參考。
  1. public class QuartzDeleteQueAction implements Job2 {
  2.         private static final long serialVersionUID = 1L;
  3.         private IQuesGroupService quesGroupService;
  4.         public void executeInternal(){     
  5.              LogUtil.jobInfo("Quartz的任務排程執行刪除教研組試題html檔案開始");
  6.              try {
  7.                      ServletContext context = ContextLoader.getCurrentWebApplicationContext().getServletContext();
  8.                  String pathHtml = context.getRealPath(Constants.PATH_HTML);
  9.                      //獲取被刪除試題No
  10.                      List<Object> list =  quesGroupService.queryDeleteQues();
  11.                      for(Object obj:list){
  12.                              String quesName = pathHtml+"ques_"+obj.toString()+".html";
  13.                              FileUtil.delFile(quesName);//刪除無用html檔案
  14.                      }
  15.                 } catch (Exception e) {
  16.                         e.printStackTrace();
  17.                 }
  18.             LogUtil.jobInfo("Quartz的任務排程執行刪除教研組試題html檔案結束");
  19.     }
  20.     public IQuesGroupService getQuesGroupService() {
  21.             return quesGroupService;
  22.     }
  23.     public void setQuesGroupService(IQuesGroupService quesGroupService) {
  24.             this.quesGroupService = quesGroupService;
  25.     }
  26. }
複製程式碼

4:配置 applicationContext-job.xml:


  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3.         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
  4.         xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
  5.         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
  6.             http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd
  7.             http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
  8.             http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd" default-autowire="byName" default-lazy-init="true">
  9.          <!-- 要呼叫的工作類 -->
  10.      <bean id="quartzJob" class="com.web.action.QuartzDeleteQueAction"></bean>
  11.      <!-- 引導Job -->
  12.          <bean id="bootstrapJob" class="com.acts.web.quartz.BootstrapJob">
  13.                 <property name="targetJob" value="quartzJob" />
  14.          </bean>
  15.          <!-- 重寫方法 -->
  16.          <bean id="jobTask" class="com.acts.web.quartz.MethodInvokingJobDetailFactoryBean">
  17.                 <property name="concurrent" value="true" />
  18.                 <property name="targetObject" ref="bootstrapJob" />
  19.          </bean>
  20.      <!-- 定義觸發時間 -->
  21.      <bean id="doTime" class="org.springframework.scheduling.quartz.CronTriggerBean">
  22.             <property name="jobDetail">
  23.                 <ref bean="jobTask"/>
  24.             </property>
  25.             <!-- cron表示式 -->
  26.             <property name="cronExpression">
  27.                 <!--5點到20點 每10分鐘一次排程 -->
  28.                 <value>0 0/10 5-20 * * ?</value>
  29.             </property>
  30.      </bean>
  31.         <!-- 總管理類 如果將lazy-init='false'那麼容器啟動就會執行排程程式 -->
  32.         <bean id="startQuertz" lazy-init="false" autowire="no" class="org.springframework.scheduling.quartz.SchedulerFactoryBean">
  33.             <property name="configLocation" value="classpath:quartz.properties" /> 
  34.             <property name="dataSource" ref="dataSourceQuartz" />
  35.             <property name="triggers">
  36.                 <list>
  37.                     <ref bean="doTime"/>
  38.                 </list>
  39.             </property>
  40.             <!-- 就是下面這句,因為該 bean 只能使用類反射來重構 -->
  41.             <property name="applicationContextSchedulerContextKey" value="applicationContext" /> 
  42.       </bean>
  43. </beans>
複製程式碼
叢集環境下測試:
      三個個節點都帶有 Quartz 任務,監控控制檯、此時只有一臺 quartz 在執行,另幾個節點上的 quartz 沒有執行。
      此時手動 停掉那臺執行 QUARTZ 過了 10分鐘左右,另一個節點的 quartz 自動監測到了叢集中執行著的 quartz 的 instance 已經 停掉 ,因此 quartz 叢集會自動把任一臺可用的 APP上啟動起一個 quartz job 的任務。
     至此 Quartz使用 叢集策略已經ok,不用改原有程式碼,配置一下我們就可做到 Quartz的叢集與自動錯誤冗餘。




所需jar包:quartz-all-1.6.6.jar   spring.jar  mysql-connector-java-3.1.11-bin.jar  commons-pool-1.3.jar  commons-logging-1.0.4.jar  commons-dbcp-1.2.1.jar