每次想多執行緒處理一個大的結果集的時候 都需要寫一大堆程式碼,自己寫了個工具類 方便使用

package com.guige.fss.common.util;

import com.guige.fss.common.exception.BusinessException;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils; import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future; /**
* Created by admin on 2018/6/5.
* @author 宋安偉
*/
public class ThreadUtil {
//建立定長執行緒池,初始化執行緒
private static Logger log = LoggerFactory.getLogger(ThreadUtil.class); /**
* 對List進行多執行緒處理(限制 對List只讀 如果想修改List 可以處理完畢後把要修改或刪除的List返回 多執行緒執行完後再修改或刪除)
* @param list 要處理的List
* @param threadSize 用幾個執行緒處理
* @param threadLoadback 處理的回撥(具體業務員)
* @param <T> 每個回撥的返回結果
* @param <V> List<V>的泛型
* @return
*/
public static <T,V>List<T> executorsTasks(final List<V> list,final int threadSize,final ThreadLoadback<T,V> threadLoadback){
// 開始時間
long start = System.currentTimeMillis();
// 總資料條數
int dataSize = list.size();
// 執行緒數
int threadNum = dataSize / threadSize + 1;
// 定義標記,過濾threadNum為整數
boolean special = dataSize % threadSize == 0;
// 建立一個執行緒池
ExecutorService exec = Executors.newFixedThreadPool(threadNum);
// 定義一個任務集合
List<Callable<T>> tasks = new ArrayList<Callable<T>>();
Callable<T> task = null;
List cutList = null; for (int i = 0; i < threadNum; i++) {
if (i == threadNum - 1) {
if (special) {
break;
}
cutList = list.subList(threadSize * i, dataSize);
} else {
cutList = list.subList(threadSize * i, threadSize * (i + 1));
}
// System.out.println("第" + (i + 1) + "組:" + cutList.toString());
final List listStr = cutList;
task = new Callable<T>() {
@Override
public T call() throws Exception {
// System.out.println(Thread.currentThread().getName() + "執行緒:" + listStr);
return (T) threadLoadback.load(listStr);
// return }
};
// 這裡提交的任務容器列表和返回的Future列表存在順序對應的關係
tasks.add(task);
}
List<Future<T>> resultsFuture = null;
try {
log.debug("執行緒任務執行開始:任務數"+tasks.size());
resultsFuture = exec.invokeAll(tasks);
List<T> results = new ArrayList<>();
for (Future<T> future : resultsFuture) {
T result=future.get();
if(result!=null) {
results.add(result);
}
}
return results; } catch (Exception e) {
e.printStackTrace();
throw new BusinessException(e.getMessage());
}finally {
// 關閉執行緒池
exec.shutdown();
log.debug("執行緒任務執行結束");
log.debug("執行任務消耗了 :" + (System.currentTimeMillis() - start) + "毫秒");
} } interface ThreadLoadback<T,V> {
T load(List<V> list) throws Exception;
} public static void main(String[] args) {
List<String> list = new ArrayList<>();
for(int i=0;i<1000;i++){
list.add("i="+i);
}
List<List<Integer>> resultList= ThreadUtil.executorsTasks(list, 10, new ThreadLoadback<List<Integer>, String>() {
@Override
public List<Integer> load(List<String> list) throws Exception {
List<Integer> result= new ArrayList<>();
for(String str:list){
str= str.replaceAll("i=","");
result.add(Integer.parseInt(str));
System.out.println(Thread.currentThread().getName()+"休息1秒");
Thread.sleep(1000L);
}
return result;
}
});
if(!CollectionUtils.isEmpty(resultList)){
List<Integer> integers = new ArrayList<>();
resultList.stream().forEach(items -> {
if (!CollectionUtils.isEmpty(resultList)) {
items.stream().forEach(item -> {
integers.add(item); });
}
}
);
integers.stream().forEach(item->System.out.println(item)); }
} }