切分大任務成多個子任務(事務),匯總後統一提交或回滾
阿新 • • 發佈:2018-09-22
batch target math frame 調用示例 turn exceptio rri action 示例代碼可以從github上獲取 https://github.com/git-simm/simm-framework.git
一、業務場景:
系統中存在一個盤庫的功能,用戶一次盤庫形成一兩萬條的盤庫明細單,一次性提交給服務器進行處理。服務器性能比較優越,平均也得運行30秒左右。性能上需要進行優化。
二、處理方案:
做過代碼分析後,發現單線程邏輯沒有什麽優化空間。開始考慮引入多線程處理模型,用10個子線程進行任務切分處理。切分子線程問題需要考慮事務的一致性。10個子線程對應10個事務,需要保證所有事務一起提交或一起回滾。這裏使用synchronized(wait,notifyall)機制做線程協作。
三、代碼實現:
3.1、添加一個多線程協作標誌類,用於做子線程運行狀態統計,通知子線程做事務提交還是回滾的操作;
package simm.framework.threadutils.multi; import java.util.UUID; /** * 多線程結束標誌 * 2018.09.22 by simm */ public class MultiEndFlag { private volatile boolean fired = false; //是否執行成功 private volatile boolean isAllSuccess = false; privatevolatile int threadCount = 0; private volatile int failCount = 0; /** * 初始化子線程的總數 * @param count */ public MultiEndFlag(int count){ threadCount = count; } public boolean isAllSuccess() { return isAllSuccess; } /** * 等待全部結束 * @param threadId *@param result */ public synchronized void waitForEnd(UUID threadId,int result){ //統計失敗的線程個數 if(result==0){ failCount++; } threadCount--; while (!fired){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 執行結束通知 */ public synchronized void go(){ fired = true; //結果都顯示成功 isAllSuccess = (failCount == 0); notifyAll(); } /** * 等待結束 */ public void end(){ while (threadCount > 0){ waitFunc(50); } System.out.println("線程全部執行完畢通知"); go(); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }
3.2、提供一個數據保存服務的接口定義,一個默認的子線程任務執行類(需要接收數據保存服務實現,業務數據,協作標誌變量);
package simm.framework.threadutils.multi; import java.util.List; import java.util.UUID; /** * 保存服務接口 * 2018.09.22 by simm * @param <T> */ public interface ISaveService<T> { /** * 子線程批量保存方法 * @param list * @param endFlag * @param threadId * @return * @throws Exception */ Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception; }
package simm.framework.threadutils.multi; import java.util.List; import java.util.UUID; import java.util.concurrent.Callable; /** * 默認的執行任務 * 2018.09.22 by simm */ public class DefaultExecTask<T> implements Callable<Integer> { private List<T> list; private ISaveService saveService; private MultiEndFlag endFlag; private UUID threadId; /** * 盤庫子任務 * @param saveService * @param notes * @param flag */ public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){ this.saveService = saveService; this.list = notes; this.endFlag = flag; this.threadId = UUID.randomUUID(); } @Override public Integer call() throws Exception { return saveService.batchSave(this.list,this.endFlag,this.threadId); } }
3.3、實現最核心的線程池分發子線程,並匯總結果通知子線程事務做最終的提交或回滾。線程池使用定長池 newFixedThreadPool,子線程使用futureTask,可接收返回值和異常信息。
package simm.framework.threadutils.multi; import java.util.ArrayList; import java.util.List; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * 多線程切分執行器 * 2018.09.22 by simm */ public class MultiExecutor { private static int maxThreadCount = 10; /** * 執行方法(分批創建子線程) * @param saveService * @param notes * @param groupLen * @return * @throws ExecutionException * @throws InterruptedException */ public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException { if(notes==null || notes.size()==0) return true; //創建一個線程池,最大10個線程 ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount); List<Future<Integer>> futures = new ArrayList<>(); int noteSize = notes.size(); int batches = (int) Math.ceil(noteSize * 1.0 /groupLen); //分組超長最大線程限制,則設置分組數為10,計算分組集合尺寸 if(batches>maxThreadCount){ batches = maxThreadCount; groupLen = (int) Math.ceil(noteSize * 1.0 /batches); } System.out.println("總長度:"+noteSize+" 批次信息:"+batches+" 分組長度:"+groupLen); MultiEndFlag flag = new MultiEndFlag(batches); int startIndex, toIndex, maxIndex = notes.size(); for(int i=0;i<batches;i++){ startIndex = i * groupLen; toIndex = startIndex + groupLen; if(toIndex> maxIndex) { toIndex = maxIndex; } List<T> temp = notes.subList(startIndex,toIndex); if(temp == null || temp.size()==0) continue; futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag))); } flag.end(); //子線程全部等待返回(存在異常,則直接拋向主線程) for(Future<Integer> future:futures){ future.get(); } //所有線程返回後,關閉線程池 executorService.shutdown(); return true; } }四、給出一個調用偽代碼。需要註意的一點,子線程開啟事務,這裏使用@Transactional聲明式事務,這要求服務的實體類需要通過spring的bean工廠創建,得到一個動態代理類,以達到支持事務攔截器的目的,保證註解的有效性。
package multi; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; import simm.framework.threadutils.multi.DefaultExecTask; import simm.framework.threadutils.multi.ISaveService; import simm.framework.threadutils.multi.MultiEndFlag; import simm.framework.threadutils.multi.MultiExecutor; import java.util.ArrayList; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; /** * 損益單保存服務 */ @Service public class DemoService implements ISaveService<NoteCheckBalance> { private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class); @Autowired private NoteCheckBalanceMapper noteCheckBalanceMapper; /** * 業務保存 * @param list */ public void save(List<NoteCheckBalance> list){ for(NoteCheckBalance item :list){ noteCheckBalanceMapper.insert(item); } } /** * 批量保存事件 */ @Transactional(rollbackFor = Exception.class) @Override public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception { int result = 0; try{ //業務操作 save(list); result = 1; //進行waitForEnd 操作,是為了確保所有的線程都最終通知同步協作標誌 endFlag.waitForEnd(threadId ,result); //其他線程異常手工回滾 if(result==1 && !endFlag.isAllSuccess()){ String message = "子線程未全部執行成功,對線程["+threadId+"]進行回滾"; throw new Exception(message); } return result; }catch (Exception ex){ logger.error(ex.toString()); if(result ==0){ //本身線程異常拋出異常,通知已經做完(判斷是為了防止 與 try塊中的通知重復) endFlag.waitForEnd(threadId ,result); } throw ex; } } /** * 調用示例 * @param args * @throws ExecutionException * @throws InterruptedException */ public static void main(String[] args) throws ExecutionException, InterruptedException { //調用示例 MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500); } }參考文章 https://www.cnblogs.com/LipeiNet/p/6475851.html
切分大任務成多個子任務(事務),匯總後統一提交或回滾