1. 程式人生 > >批量介面多執行緒併發執行

批量介面多執行緒併發執行

開發中可能會有這樣的場景,一個功能需要同時完成不相關的多個操作,最後針對多個操作結果統一處理。比如一個查詢功能內部需要同時查詢A、B、C、D四個介面,彙總所有介面查詢內容後返回,一般來說可以逐個查詢彙總即可。

現在問題來了,如介面超時時間限制為3秒,且A、B、C、D四個介面每個介面RT=1,每一個介面都有RT消耗,則同時完成四個介面查詢併合並返回的時間必然大於4秒,導致總查詢功能介面超時。一般大家會想到使用Future,即希望通過多執行緒併發執行解決該問題。查詢執行開始,啟動四個執行緒同時執行A、B、C、D四個介面查詢,最後合併多個執行緒查詢結果即可。開發過程中一般結合線程池配合,以保證更好的執行緒管理。

簡單示例如下<僅供參考>:

1、執行任務執行緒

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;

/**
 * 服務提供描述
 */
public class ServiceInvoker implements Callable<List<InsurePolicy>> {

    private InsureQueryService insureQueryService;//介面
    
    private String userId;//引數

    private String desc;//介面描述,方便介面異常統計
    
    public ServiceInvoker() {

    }
    
    /**
     * 執行緒任務構造類
     */
    public ServiceInvoker(String userId, String method, String desc, InsureQueryService insureQueryService) {
        this.userId = userId;
        this.desc = desc;
        this.insureQueryService = insureQueryService;
    }
    
    @Override
    public List<InsurePolicy> call() throws Exception {
        try{
            long start = System.currentTimeMillis();
            InsureResult<List<InsurePolicy>> insureResult = insureQueryService.queryPolicyList("***", userId);
            long end = System.currentTimeMillis();
            LoggerUtil.rtLog("UserId[" + userId + "],Interface[" + desc + "],耗時: ", end - start);
            return insureResult.getResult();
        }catch(Exception e){
            LoggerUtil.log("ServiceInvoker_queryPolicyList_callback error:userId=" + userId, e);
        }
        
        return new ArrayList<InsurePolicy>();
    }
}

2、執行緒池-批任務併發執行

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;

/**
 * 多執行緒批量請求介面
 */
public class BatchInvokerServiceImpl implements BatchInvokerService {
    /**
     * 呼叫的執行緒池大小
     */
    private int poolSize = 60;

    private int timeOut = 3000;

    private ExecutorService pool = Executors.newFixedThreadPool(poolSize);
    
    @Resource
    private InsureQueryService insureQueryService1;
    @Resource
    private InsureQueryService insureQueryService2;
    
    private List<InsurePolicy> invokeQueryPolicy(List<ServiceInvoker> invokerList) {
        List<InsurePolicy> results = new ArrayList<InsurePolicy>();
        try {
            int index = 0;
            List<Future<List<InsurePolicy>>> futures = pool.invokeAll(invokerList, timeOut, TimeUnit.MILLISECONDS);
            for (Future<List<InsurePolicy>> future : futures) {
                try {
                    if (future != null) {
                        if (future.isCancelled()) {
                            LoggerUtil.log("insure_invokeHSFQueryPolicy_fail:[" + InterfaceEnum.getByIndex(index) + "]Future已超時被終止");
                            continue;
                        }
                        List<InsurePolicy> resultList = future.get();
                        if (null != resultList && !resultList.isEmpty()) {
                            results.addAll(resultList);
                        }
                    }
                } catch (Exception e) {
                    LoggerUtil.log("insure_invokeHSFQueryPolicy_error:[" + InterfaceEnum.getByIndex(index) + "]Feture取值異常", e);
                }
            }
        } catch (Exception e) {
            LoggerUtil.log("insure_invokeHSFQueryPolicy_error:執行緒池異常", e);
        }
        return results;
    }
}

3、對外查詢介面定義

    /**
     * 執行批處理執行緒
     */
    public List<InsurePolicy> queryPolicyList(String userId) {
        List<ServiceInvoker> invokers = new ArrayList<ServiceInvoker>();
        invokers.add(new ServiceInvoker(userId, "A-DB查詢", insureQueryService1));
        invokers.add(new ServiceInvoker(userId, "B-DB查詢", insureQueryService2));
        
        return invokeQueryPolicy(invokers);
    }

應用中可以做更多的優化和擴充套件,支援不同型別介面、引數併發執行,以實現工具通用化。