1. 程式人生 > >切分大任務成多個子任務(事務),匯總後統一提交或回滾

切分大任務成多個子任務(事務),匯總後統一提交或回滾

batch target math frame 調用示例 turn exceptio rri action

示例代碼可以從github上獲取 https://github.com/git-simm/simm-framework.git 一、業務場景:   系統中存在一個盤庫的功能,用戶一次盤庫形成一兩萬條的盤庫明細單,一次性提交給服務器進行處理。服務器性能比較優越,平均也得運行30秒左右。性能上需要進行優化。 二、處理方案:   做過代碼分析後,發現單線程邏輯沒有什麽優化空間。開始考慮引入多線程處理模型,用10個子線程進行任務切分處理。切分子線程問題需要考慮事務的一致性。10個子線程對應10個事務,需要保證所有事務一起提交或一起回滾。這裏使用synchronized(wait,notifyall)機制做線程協作。 三、代碼實現:
  3.1、添加一個多線程協作標誌類,用於做子線程運行狀態統計,通知子線程做事務提交還是回滾的操作;
package simm.framework.threadutils.multi;

import java.util.UUID;

/**
 * 多線程結束標誌
 * 2018.09.22 by simm
 */
public class MultiEndFlag {
    private volatile boolean fired = false;
    //是否執行成功
    private volatile boolean isAllSuccess = false;
    private
volatile int threadCount = 0; private volatile int failCount = 0; /** * 初始化子線程的總數 * @param count */ public MultiEndFlag(int count){ threadCount = count; } public boolean isAllSuccess() { return isAllSuccess; } /** * 等待全部結束 * @param threadId *
@param result */ public synchronized void waitForEnd(UUID threadId,int result){ //統計失敗的線程個數 if(result==0){ failCount++; } threadCount--; while (!fired){ try { wait(); } catch (InterruptedException e) { e.printStackTrace(); } } } /** * 執行結束通知 */ public synchronized void go(){ fired = true; //結果都顯示成功 isAllSuccess = (failCount == 0); notifyAll(); } /** * 等待結束 */ public void end(){ while (threadCount > 0){ waitFunc(50); } System.out.println("線程全部執行完畢通知"); go(); } /** * 等待 */ private void waitFunc(long millis){ try { Thread.sleep(millis); } catch (InterruptedException e) { e.printStackTrace(); } } }

  3.2、提供一個數據保存服務的接口定義,一個默認的子線程任務執行類(需要接收數據保存服務實現,業務數據,協作標誌變量);

package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;

/**
 * 保存服務接口
 * 2018.09.22 by simm
 * @param <T>
 */
public interface ISaveService<T> {
    /**
     * 子線程批量保存方法
     * @param list
     * @param endFlag
     * @param threadId
     * @return
     * @throws Exception
     */
    Integer batchSave(List<T> list, MultiEndFlag endFlag, UUID threadId) throws Exception;
}
package simm.framework.threadutils.multi;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.Callable;

/**
 * 默認的執行任務
 * 2018.09.22 by simm
 */
public class DefaultExecTask<T> implements Callable<Integer> {
    private List<T> list;
    private ISaveService saveService;
    private MultiEndFlag endFlag;
    private UUID threadId;
    /**
     * 盤庫子任務
     * @param saveService
     * @param notes
     * @param flag
     */
    public DefaultExecTask(ISaveService saveService, List<T> notes, MultiEndFlag flag){
        this.saveService = saveService;
        this.list = notes;
        this.endFlag = flag;
        this.threadId = UUID.randomUUID();
    }
    @Override
    public Integer call() throws Exception {
        return saveService.batchSave(this.list,this.endFlag,this.threadId);
    }
}

  3.3、實現最核心的線程池分發子線程,並匯總結果通知子線程事務做最終的提交或回滾。線程池使用定長池 newFixedThreadPool,子線程使用futureTask,可接收返回值和異常信息。
package simm.framework.threadutils.multi;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

/**
 * 多線程切分執行器
 * 2018.09.22 by simm
 */
public class MultiExecutor {
    private static int maxThreadCount = 10;
    /**
     * 執行方法(分批創建子線程)
     * @param saveService
     * @param notes
     * @param groupLen
     * @return
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static <T> Boolean exec(ISaveService saveService,List<T> notes,int groupLen) throws ExecutionException, InterruptedException {
        if(notes==null || notes.size()==0) return true;
        //創建一個線程池,最大10個線程
        ExecutorService executorService = Executors.newFixedThreadPool(maxThreadCount);
        List<Future<Integer>> futures = new ArrayList<>();
        int noteSize = notes.size();
        int batches = (int) Math.ceil(noteSize * 1.0 /groupLen);
        //分組超長最大線程限制,則設置分組數為10,計算分組集合尺寸
        if(batches>maxThreadCount){
            batches = maxThreadCount;
            groupLen = (int) Math.ceil(noteSize * 1.0 /batches);
        }
        System.out.println("總長度:"+noteSize+"  批次信息:"+batches+"  分組長度:"+groupLen);
        MultiEndFlag flag = new MultiEndFlag(batches);
        int startIndex, toIndex, maxIndex = notes.size();
        for(int i=0;i<batches;i++){
            startIndex = i * groupLen;
            toIndex = startIndex + groupLen;
            if(toIndex> maxIndex) {
                toIndex = maxIndex;
            }
            List<T> temp = notes.subList(startIndex,toIndex);
            if(temp == null || temp.size()==0) continue;
            futures.add(executorService.submit(new DefaultExecTask(saveService,temp,flag)));
        }
        flag.end();
        //子線程全部等待返回(存在異常,則直接拋向主線程)
        for(Future<Integer> future:futures){
            future.get();
        }
        //所有線程返回後,關閉線程池
        executorService.shutdown();
        return true;
    }
}
四、給出一個調用偽代碼。需要註意的一點,子線程開啟事務,這裏使用@Transactional聲明式事務,這要求服務的實體類需要通過spring的bean工廠創建,得到一個動態代理類,以達到支持事務攔截器的目的,保證註解的有效性。
package multi;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import simm.framework.threadutils.multi.DefaultExecTask;
import simm.framework.threadutils.multi.ISaveService;
import simm.framework.threadutils.multi.MultiEndFlag;
import simm.framework.threadutils.multi.MultiExecutor;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;

/**
 * 損益單保存服務
 */
@Service
public class DemoService implements ISaveService<NoteCheckBalance> {
    private static final Logger logger = LoggerFactory.getLogger(DefaultExecTask.class);
    @Autowired
    private NoteCheckBalanceMapper noteCheckBalanceMapper;

    /**
     * 業務保存
     * @param list
     */
    public void save(List<NoteCheckBalance> list){
        for(NoteCheckBalance item :list){
            noteCheckBalanceMapper.insert(item);
        }
    }
    /**
     * 批量保存事件
     */
    @Transactional(rollbackFor = Exception.class)
    @Override
    public Integer batchSave(List<NoteCheckBalance> list, MultiEndFlag endFlag, UUID threadId) throws Exception {
        int result = 0;
        try{
            //業務操作
            save(list);
            result = 1;
            //進行waitForEnd 操作,是為了確保所有的線程都最終通知同步協作標誌
            endFlag.waitForEnd(threadId ,result);
            //其他線程異常手工回滾
            if(result==1 && !endFlag.isAllSuccess()){
                String message = "子線程未全部執行成功,對線程["+threadId+"]進行回滾";
                throw new Exception(message);
            }
            return result;
        }catch (Exception ex){
            logger.error(ex.toString());
            if(result ==0){
                //本身線程異常拋出異常,通知已經做完(判斷是為了防止 與 try塊中的通知重復)
                endFlag.waitForEnd(threadId ,result);
            }
            throw ex;
        }
    }

    /**
     * 調用示例
     * @param args
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //調用示例
        MultiExecutor.exec(new DemoService(), new ArrayList<NoteCheckBalance>(),500);
    }
}
參考文章 https://www.cnblogs.com/LipeiNet/p/6475851.html

切分大任務成多個子任務(事務),匯總後統一提交或回滾