1. 程式人生 > >ThreadUtil 多執行緒處理List,回撥處理具體的任務

ThreadUtil 多執行緒處理List,回撥處理具體的任務

每次想多執行緒處理一個大的結果集的時候 都需要寫一大堆程式碼,自己寫了個工具類 方便使用

package com.guige.fss.common.util;


import com.guige.fss.common.exception.BusinessException;
import io.swagger.models.auth.In;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import java.util.ArrayList; import java.util.List; import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * Created by admin on 2018/6/5. * @author 宋安偉 */ public class ThreadUtil { //建立定長執行緒池,初始化執行緒
private static Logger log = LoggerFactory.getLogger(ThreadUtil.class); /** * 對List進行多執行緒處理(限制 對List只讀 如果想修改List 可以處理完畢後把要修改或刪除的List返回 多執行緒執行完後再修改或刪除) * @param list 要處理的List * @param threadSize 用幾個執行緒處理 * @param threadLoadback 處理的回撥(具體業務員) * @param <T> 每個回撥的返回結果 *
@param <V> List<V>的泛型 * @return */ public static <T,V>List<T> executorsTasks(final List<V> list,final int threadSize,final ThreadLoadback<T,V> threadLoadback){ // 開始時間 long start = System.currentTimeMillis(); // 總資料條數 int dataSize = list.size(); // 執行緒數 int threadNum = dataSize / threadSize + 1; // 定義標記,過濾threadNum為整數 boolean special = dataSize % threadSize == 0; // 建立一個執行緒池 ExecutorService exec = Executors.newFixedThreadPool(threadNum); // 定義一個任務集合 List<Callable<T>> tasks = new ArrayList<Callable<T>>(); Callable<T> task = null; List cutList = null; for (int i = 0; i < threadNum; i++) { if (i == threadNum - 1) { if (special) { break; } cutList = list.subList(threadSize * i, dataSize); } else { cutList = list.subList(threadSize * i, threadSize * (i + 1)); } // System.out.println("第" + (i + 1) + "組:" + cutList.toString()); final List listStr = cutList; task = new Callable<T>() { @Override public T call() throws Exception { // System.out.println(Thread.currentThread().getName() + "執行緒:" + listStr); return (T) threadLoadback.load(listStr); // return } }; // 這裡提交的任務容器列表和返回的Future列表存在順序對應的關係 tasks.add(task); } List<Future<T>> resultsFuture = null; try { log.debug("執行緒任務執行開始:任務數"+tasks.size()); resultsFuture = exec.invokeAll(tasks); List<T> results = new ArrayList<>(); for (Future<T> future : resultsFuture) { T result=future.get(); if(result!=null) { results.add(result); } } return results; } catch (Exception e) { e.printStackTrace(); throw new BusinessException(e.getMessage()); }finally { // 關閉執行緒池 exec.shutdown(); log.debug("執行緒任務執行結束"); log.debug("執行任務消耗了 :" + (System.currentTimeMillis() - start) + "毫秒"); } } interface ThreadLoadback<T,V> { T load(List<V> list) throws Exception; } public static void main(String[] args) { List<String> list = new ArrayList<>(); for(int i=0;i<1000;i++){ list.add("i="+i); } List<List<Integer>> resultList= ThreadUtil.executorsTasks(list, 10, new ThreadLoadback<List<Integer>, String>() { @Override public List<Integer> load(List<String> list) throws Exception { List<Integer> result= new ArrayList<>(); for(String str:list){ str= str.replaceAll("i=",""); result.add(Integer.parseInt(str)); System.out.println(Thread.currentThread().getName()+"休息1秒"); Thread.sleep(1000L); } return result; } }); if(!CollectionUtils.isEmpty(resultList)){ List<Integer> integers = new ArrayList<>(); resultList.stream().forEach(items -> { if (!CollectionUtils.isEmpty(resultList)) { items.stream().forEach(item -> { integers.add(item); }); } } ); integers.stream().forEach(item->System.out.println(item)); } } }