1. 程式人生 > >一份基於quartz的任務多執行緒處理模板

一份基於quartz的任務多執行緒處理模板

本任務處理模板使用maven管理具體jar包依賴,使用quartz2.2.2搭建的一個定時任務處理模板,模板提供了一個CommonJob類用於quartz呼叫,此類的作用是處理任務模板類,規定了處理任務的步驟為:①獲取待處理任務列表;②遍歷待處理任務列表,逐一進行處理。然後只需要注入一個具體的任務類,此任務類可注入相應的業務處理service,service需實現CommonJobService中各個方法,service中相應方法可宣告事物,以便模板類呼叫完成具體任務處理流程。


具體XML配置如下:

<?xml version="1.0" encoding="UTF-8"?>  
<beans xmlns="http://www.springframework.org/schema/beans"  
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
                            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd">  
   
    <context:component-scan base-package="com.defonds.scheduler" />  
    
    
    <!-- 具體的任務action需要實現commonJobService內各方法 -->
    <bean id="fileReadTask" class="com.action.FileReadTaskAction">
    	<property name="fileReadService" ref="fileReadService"></property>
    </bean>
    
    <bean id="fileReadService" class="com.service.impl.FileReadServiceImpl">
    </bean>
   
    <bean id="commonJob" class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean">  
        <property name="targetObject">
        	<!-- 為commonJob模板注入具體的任務action -->
        	<bean class="com.job.CommonJob">
        		<property name="autoTaskAction" ref="fileReadTask"></property>
        		<property name="threadPoolSize" value="5"></property>
        	</bean>
        </property>
		<property name="targetMethod" value="execute"></property>  
    </bean>  
   
    <!-- Run the job every 5 seconds -->  
    <bean id="cronTrigger"  class="org.springframework.scheduling.quartz.CronTriggerFactoryBean">  
        <property name="jobDetail" ref="commonJob" />  
        <property name="cronExpression" value="0/5 * * * * ?" />        
    </bean>  
   
   
    <!-- Scheduler factory bean to glue together jobDetails and triggers to Configure Quartz Scheduler -->  
    <bean  class="org.springframework.scheduling.quartz.SchedulerFactoryBean">  
        <property name="jobDetails">  
            <list>  
                <ref bean="commonJob" />  
            </list>  
        </property>  
        <property name="triggers">  
            <list>  
                <ref bean="cronTrigger" />  
            </list>  
        </property>  
    </bean>  
   
</beans>  

commonJob類程式碼如下:

package com.job;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;

import com.common.AutoTask;
import com.common.Transaction;

public class CommonJob{
	
	private static Logger logger = LoggerFactory.getLogger(CommonJob.class);
	private String threadPoolSize;
	private AutoTask autoTaskAction;// 注入的任務
	private Map dupMap = new ConcurrentHashMap();
	
	// 定時任務執行入口方法
	public void execute() throws Exception{
		
		List<Object> list = autoTaskAction.fetchData();// 獲取待處理狀態任務資料
		ExecutorService threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize));
		
		// 遍歷獲取的待處理任務列表
		for(Object data:list){
			int uniqueId = autoTaskAction.getUniqueId(data);
			// 對任務做防重處理
			if(checkDup(uniqueId)){
				logger.debug("This transaction is under processing now : "+uniqueId);
				continue;// 若任務已在 處理,則暫不處理該任務,開始處理下一個任務
			}
			
			//檢查任務是否為待處理狀態(任務為狀態驅動,再次確認任務是否為待處理狀態,若是則繼續處理,否則說明已正在被處理或已處理完)
			if(!checkStatus(uniqueId)){
				logger.debug("This transaction is not in wait-process status : "+uniqueId);
				continue;
			}
			// 將任務包裝為Runnable
			RunnableTask task = new RunnableTask(data);
			// 提交給執行緒池執行
			threadPool.execute(task);
			
		}
		
		threadPool.shutdown();
		
	}
	
	protected boolean checkDup(int uniqueId){
		// job類內部維護一個防重表,若無此任務ID,則放置入Map,若已存在,則說明任務已在處理
		if(dupMap.get(uniqueId) == null){
			dupMap.put(uniqueId, "");
			return false;
		}else{
			return true;
		}
	}
	
	protected boolean checkStatus(int uniqueId){
		//根據ID獲取任務詳情
		Transaction t = autoTaskAction.loadTransactionById(uniqueId);
		//若任務狀態仍未待處理則檢查成功,繼續處理任務,否則false,不處理該任務
		if("wait_process".equals(t.getStatus())){
			return true;
		}else{
			return false;
		}
		
	}
	
	public class RunnableTask implements Runnable{
		private Object data;
		// 構造方法需將任務實體傳入
		public RunnableTask(Object data){
			this.data = data;
		}
		public void run() {
			int uniqueId = autoTaskAction.getUniqueId(data);
			try{
				// 呼叫注入的任務類處理任務方法處理任務
				autoTaskAction.deal(data);
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				// 處理完任務或丟擲異常時均在防重map中將任務ID刪除
				dupMap.remove(uniqueId);
			}
		}	
	}

	public void setThreadPoolSize(String threadPoolSize) {
		threadPoolSize = threadPoolSize;
	}

	public void setAutoTaskAction(AutoTask autoTaskAction) {
		this.autoTaskAction = autoTaskAction;
	}
	
}

該類提供了任務處理模板,其中execute方法為quartz定時任務入口方法(xml中設定),負責獲取待處理任務列表,初始化執行緒池,遍歷待處理任務列表,依次處理。其中我們將任務處理方法封裝進內部Runnable類,以便利用執行緒池更快處理任務列表。任務處理時注意防重及任務狀態確認,防止重複處理

以下為具體任務處理action及介面方法類:

package com.action;

import java.util.List;

import com.common.AutoTask;
import com.common.Transaction;
import com.service.CommonJobService;

public class FileReadTaskAction implements AutoTask{
	
	private CommonJobService fileReadService;

	public List<Object> fetchData() {
		List<Object> list = fileReadService.fetchData();
		return list;
	}

	public int getUniqueId(Object data) {
		Transaction t = (Transaction)data;
		return t.getTransId();
	}

	public void deal(Object data) {	
		fileReadService.deal(data);
	}

	public Transaction loadTransactionById(int transId) {
		
		return (Transaction)fileReadService.loadTransactionById(transId);
	}
	
	public void setFileReadService(CommonJobService fileReadService) {
		this.fileReadService = fileReadService;
	}

}
package com.common;

import java.util.List;

public interface AutoTask {

	public List<Object> fetchData();
	
	public int getUniqueId(Object data);
	
	public void deal(Object data);
	
	public Transaction loadTransactionById(int transId);
	
}
package com.service;

import java.util.List;

import com.common.Transaction;

public interface CommonJobService {
	
	public List<Object> fetchData();
	
	public void deal(Object data);
	
	public Transaction loadTransactionById(int transId);

}

最後為quartz的IOC容器啟動類,通過run as java application即可實現定時處理任務

package com.auto;

import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

public class App 
{
    public static void main( String[] args )
    {
    	ApplicationContext applicationContext = null;
		try {
			//獲取配置檔案,啟動IOC容器
			applicationContext = new ClassPathXmlApplicationContext("classpath*:quartz.xml");

			System.out.println("AutoHandler is started successfully!!");
		} catch (Exception e) {
			e.printStackTrace();
		}
    }
}


後修正了CommonJob.java的執行緒池生成方式,實現了InitializingBean介面,通過註解@value注入執行緒池PoolSize引數,讓他在bean剛開始例項化後即產生執行緒池,完成任務列表後無需關閉執行緒池,等待下次任務週期再次複用即可。這樣整個bean的生命週期只用建立一次固定大小執行緒池,省去了多次建立、銷燬執行緒池的開銷。程式碼如下:

package com.job;

import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Value;

import com.common.AutoTask;
import com.common.Transaction;

public class CommonJob implements InitializingBean{
	
	private static Logger logger = LoggerFactory.getLogger(CommonJob.class);
	@Value("${commonJob.threadPoolSize}")
	private String threadPoolSize;
	private AutoTask autoTaskAction;// 注入的任務
	private Map dupMap = new ConcurrentHashMap();
	private ExecutorService threadPool;
	
	// 定時任務執行入口方法
	public void execute() throws Exception{
		
		List<Object> list = autoTaskAction.fetchData();// 獲取待處理狀態任務資料
		//ExecutorService threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize));
		
		// 遍歷獲取的待處理任務列表
		for(Object data:list){
			int uniqueId = autoTaskAction.getUniqueId(data);
			// 對任務做防重處理
			if(checkDup(uniqueId)){
				logger.debug("This transaction is under processing now : "+uniqueId);
				continue;// 若任務已在 處理,則暫不處理該任務,開始處理下一個任務
			}
			
			//檢查任務是否為待處理狀態(任務為狀態驅動,再次確認任務是否為待處理狀態,若是則繼續處理,否則說明已正在被處理或已處理完)
			if(!checkStatus(uniqueId)){
				logger.debug("This transaction is not in wait-process status : "+uniqueId);
				continue;
			}
			// 將任務包裝為Runnable
			RunnableTask task = new RunnableTask(data);
			// 提交給執行緒池執行
			threadPool.execute(task);
			
		}
		
	}
	
	protected boolean checkDup(int uniqueId){
		// job類內部維護一個防重表,若無此任務ID,則放置入Map,若已存在,則說明任務已在處理
		if(dupMap.get(uniqueId) == null){
			dupMap.put(uniqueId, "");
			return false;
		}else{
			return true;
		}
	}
	
	protected boolean checkStatus(int uniqueId){
		//根據ID獲取任務詳情
		Transaction t = autoTaskAction.loadTransactionById(uniqueId);
		//若任務狀態仍未待處理則檢查成功,繼續處理任務,否則false,不處理該任務
		if("wait_process".equals(t.getStatus())){
			return true;
		}else{
			return false;
		}
		
	}
	
	public class RunnableTask implements Runnable{
		private Object data;
		// 構造方法需將任務實體傳入
		public RunnableTask(Object data){
			this.data = data;
		}
		public void run() {
			int uniqueId = autoTaskAction.getUniqueId(data);
			try{
				// 呼叫注入的任務類處理任務方法處理任務
				autoTaskAction.deal(data);
			}catch(Exception e){
				e.printStackTrace();
			}finally{
				// 處理完任務或丟擲異常時均在防重map中將任務ID刪除
				dupMap.remove(uniqueId);
			}
		}	
	}

	/*public void setThreadPoolSize(String threadPoolSize) {
		threadPoolSize = threadPoolSize;
	}*/

	public void setAutoTaskAction(AutoTask autoTaskAction) {
		this.autoTaskAction = autoTaskAction;
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		threadPool = Executors.newFixedThreadPool(Integer.parseInt(threadPoolSize));
		
	}
	
}