對純for迴圈進行並行流和執行緒池優化簡記
最近在專案中需要根據角色拿對應任務(activiti 中的內容),再根據任務拿工單集(任務跟工單為多對一的關係,所以還需過濾掉重複的工單),獲得最終結果集的耗時較大,所以考慮從多執行緒方向優化。
這些操作在一個for迴圈裡涉及兩處資料庫查詢IO,但IO的阻塞不嚴重,不屬於IO密集型的多執行緒設計條件,所以採用執行緒池,優化的效果不是很好(對於IO阻塞係數較小的應用優化,多執行緒的使用,CPU線上程非阻塞的狀態下切換上下文,消耗很大,同時還得加上建立執行緒的開銷);採用concurrent包的Stream來進行併發流式的處理,優化效果還比較可觀。
現針對三種不同的實現方式和執行效果通列如下:
原始for迴圈:
//同組測試資料,2038ms 結果集穩定
方法體:
List<WorkOrder> content = new ArrayList<WorkOrder>();
Map<String,Integer> widMap=new HashMap<String,Integer>();
for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();
//同組測試資料,2038ms 結果集穩定
for (Task task : tasks) {
ProcessInstance pi = runtimeService
.createProcessInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.singleResult();
String workOrderId = pi.getBusinessKey();
if(!widMap.containsKey(workOrderId)){
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
workOrderId, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
if(wo!=null){
content.add(wo);
}
widMap.put(workOrderId, 1);
}
}
}
並行流:
//並行流,712ms 結果集穩定
方法體:
List<WorkOrder> content = new ArrayList<WorkOrder>();
Map<String,Integer> widMap=new HashMap<String,Integer>();
for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();
//並行流,712ms 結果集穩定
tasks.parallelStream().map(t->{
final String wid=runtimeService.createProcessInstanceQuery()
.processInstanceId(t.getProcessInstanceId())
.singleResult()
.getBusinessKey();
if(!widMap.containsKey(wid)){
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
wid, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
if(wo!=null){
content.add(wo);
}
widMap.put(wid, 1);
}
return 1;
}
).count();
}
spring 下的非同步執行緒池:
//執行緒池,2627ms ,結果集不穩定
方法體:
BlockingDeque<Future<WorkOrder>> resultContainer=new LinkedBlockingDeque<Future<WorkOrder>>();
Date start =new Date();
for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();
//執行緒池,2627ms ,結果集不穩定(最終結果可能與實際不符,存在漏資料,無資料現象)
int i=0;
for(Task task:tasks){
try {
resultContainer.put(workOrderAsynHander.gainWorkOrderByTask(task, id));
i++;
System.out.println("put successful,thread "+i+"executed");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
int j=0;
while(!resultContainer.isEmpty()){
try {
Future<WorkOrder> wf=resultContainer.remove();
WorkOrder w= wf == null ? null : wf.get();
if(w!=null){
content.add(w);
}
j++;
System.out.println("get successful,thread "+j+"executed");
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
非同步的方法類:
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.TaskService;
import org.activiti.engine.task.Task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;
@Component
public class WorkOrderAsynHander {
private static final ConcurrentMap<String,Object> workOrderIds=new ConcurrentHashMap<String,Object>();
private static Lock lock=new ReentrantLock();
@Autowired
private RuntimeService runtimeService;
@Autowired
private WorkOrderService workOrderService;
@Async("extraTaskAsynPool")
public Future<WorkOrder> gainWorkOrderByTask(Task task,String id){
String wid=runtimeService.createProcessInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.singleResult()
.getBusinessKey();
if(!workOrderIds.containsKey(wid)){
workOrderIds.put(wid, 1);
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
wid, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
return new AsyncResult<WorkOrder>(wo);
}
return null;
}
}
執行緒池定義:
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
@EnableAsync
public class TaskExecutorPool {
@Value("${extraTaskAsynPool.corePoolSize}")
private int corePoolSize;
@Value("${extraTaskAsynPool.maxPoolSize}")
private int maxPoolSize;
@Value("${extraTaskAsynPool.queueCapacity}")
private int queueCapacity;
@Value("${extraTaskAsynPool.keepAliveSeconds}")
private int keepAliveSeconds;
@Bean
public Executor extraTaskAsynPool(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(corePoolSize);
executor.setMaxPoolSize(maxPoolSize);
executor.setQueueCapacity(queueCapacity);
executor.setKeepAliveSeconds(keepAliveSeconds);
executor.setThreadNamePrefix("extra_");
// rejection-policy:當pool已經達到max size的時候,處理新任務策略
// CALLER_RUNS:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}
spring boot 下的非同步啟動配置:
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class MESApplication {
public static void main(String[] args) {
SpringApplication.run(MESApplication.class, args);
}
}