1. 程式人生 > >對純for迴圈進行並行流和執行緒池優化簡記

對純for迴圈進行並行流和執行緒池優化簡記

最近在專案中需要根據角色拿對應任務(activiti 中的內容),再根據任務拿工單集(任務跟工單為多對一的關係,所以還需過濾掉重複的工單),獲得最終結果集的耗時較大,所以考慮從多執行緒方向優化。

這些操作在一個for迴圈裡涉及兩處資料庫查詢IO,但IO的阻塞不嚴重,不屬於IO密集型的多執行緒設計條件,所以採用執行緒池,優化的效果不是很好(對於IO阻塞係數較小的應用優化,多執行緒的使用,CPU線上程非阻塞的狀態下切換上下文,消耗很大,同時還得加上建立執行緒的開銷);採用concurrent包的Stream來進行併發流式的處理,優化效果還比較可觀。

現針對三種不同的實現方式和執行效果通列如下:

原始for迴圈:

//同組測試資料,2038ms 結果集穩定

方法體:

List<WorkOrder> content = new ArrayList<WorkOrder>();
Map<String,Integer> widMap=new HashMap<String,Integer>();

for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();

//同組測試資料,2038ms 結果集穩定
for (Task task : tasks) {
ProcessInstance pi = runtimeService
.createProcessInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.singleResult();
String workOrderId = pi.getBusinessKey();
if(!widMap.containsKey(workOrderId)){
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
workOrderId, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
if(wo!=null){
content.add(wo);
}
widMap.put(workOrderId, 1);
}
}

}

並行流:

//並行流,712ms 結果集穩定

方法體:

List<WorkOrder> content = new ArrayList<WorkOrder>();
Map<String,Integer> widMap=new HashMap<String,Integer>();

for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();

//並行流,712ms 結果集穩定
tasks.parallelStream().map(t->{
final String wid=runtimeService.createProcessInstanceQuery()
.processInstanceId(t.getProcessInstanceId())
.singleResult()
.getBusinessKey();
if(!widMap.containsKey(wid)){
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
wid, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
if(wo!=null){
content.add(wo);
}
widMap.put(wid, 1);
}
return 1;
}
).count();

}

spring 下的非同步執行緒池:

//執行緒池,2627ms ,結果集不穩定

方法體:

BlockingDeque<Future<WorkOrder>> resultContainer=new LinkedBlockingDeque<Future<WorkOrder>>();

Date start =new Date();
for (String id : ids) {
List<Task> tasks = taskService.createTaskQuery()
.taskCandidateGroup(id).active().list();
//執行緒池,2627ms ,結果集不穩定(最終結果可能與實際不符,存在漏資料,無資料現象)
int i=0;
for(Task task:tasks){
try {
resultContainer.put(workOrderAsynHander.gainWorkOrderByTask(task, id));
i++;
System.out.println("put successful,thread "+i+"executed");
} catch (InterruptedException e) {

e.printStackTrace();
}
}
int j=0;
while(!resultContainer.isEmpty()){
try {
Future<WorkOrder> wf=resultContainer.remove();
WorkOrder w= wf == null ? null : wf.get();
if(w!=null){
content.add(w);
}
j++;
System.out.println("get successful,thread "+j+"executed");
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

非同步的方法類:

import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


import org.activiti.engine.RuntimeService;
import org.activiti.engine.TaskService;
import org.activiti.engine.task.Task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Component;


@Component
public class WorkOrderAsynHander {
private static final ConcurrentMap<String,Object> workOrderIds=new ConcurrentHashMap<String,Object>();
private static Lock lock=new ReentrantLock();
@Autowired
private RuntimeService runtimeService;
@Autowired
private WorkOrderService workOrderService;

@Async("extraTaskAsynPool")
public Future<WorkOrder> gainWorkOrderByTask(Task task,String id){


String wid=runtimeService.createProcessInstanceQuery()
.processInstanceId(task.getProcessInstanceId())
.singleResult()
.getBusinessKey();
if(!workOrderIds.containsKey(wid)){
workOrderIds.put(wid, 1);
WorkOrder wo = workOrderService.findStartedWorkOrderAnd(
wid, Constants.WORKORDER.CONFIGURED,
Constants.WORKORDER.INPROCESSED,
Constants.WORKORDER.UNFILLED);
return new AsyncResult<WorkOrder>(wo);
}


return null;
}
}

執行緒池定義:

import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

@Configuration  
@EnableAsync  
public class TaskExecutorPool {

@Value("${extraTaskAsynPool.corePoolSize}")
private int corePoolSize;
@Value("${extraTaskAsynPool.maxPoolSize}")
private int maxPoolSize;
@Value("${extraTaskAsynPool.queueCapacity}")
private int queueCapacity;
@Value("${extraTaskAsynPool.keepAliveSeconds}")
private int keepAliveSeconds;

@Bean
public Executor extraTaskAsynPool(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();  
        executor.setCorePoolSize(corePoolSize);  
        executor.setMaxPoolSize(maxPoolSize);  
        executor.setQueueCapacity(queueCapacity);  
        executor.setKeepAliveSeconds(keepAliveSeconds);  
        executor.setThreadNamePrefix("extra_");  
  
        // rejection-policy:當pool已經達到max size的時候,處理新任務策略  
        // CALLER_RUNS:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行  
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());  
        executor.initialize();  
        return executor; 
}
}

spring boot  下的非同步啟動配置:

@SpringBootApplication
@EnableScheduling
@EnableAsync
public class MESApplication {
public static void main(String[] args) {
SpringApplication.run(MESApplication.class, args);
}

}