1. 程式人生 > >[email protected]異步線程池的配置及應用

[email protected]異步線程池的配置及應用

div row bject red 線程 contains bsp throw rup

示例:

1、 配置

@EnableAsync
@Configuration
public class TaskExecutorConfiguration {

    @Autowired
    private TaskExecutorProperties taskExecutorProperties;

    @Bean
    public Executor routeGen() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(taskExecutorProperties.getCorePoolSize());     
//50 executor.setQueueCapacity(taskExecutorProperties.getQueueCapacity()); //200 executor.setMaxPoolSize(taskExecutorProperties.getMaxPoolSize()); //500 executor.setKeepAliveSeconds(taskExecutorProperties.getKeepAliveSeconds()); executor.setThreadNamePrefix("gen-"); executor.setRejectedExecutionHandler(
new ThreadPoolExecutor.AbortPolicy()); executor.initialize(); return executor; }
}

2、 運用(作用於方法上)

@Async("routeGen")
public Future<Result<String>> genRouteByCategory(RouteGenDTO routeGenDTO, List<String> cityList, String category){
}

3、 異常處理及日誌記錄

   public
Result<String> genRouteByShard(RouteGenDTO routeGenDTO) throws RouteGenException { // 根據分片獲取城市 Result<List<String>> cityResult = cityService.queryAllCity(); if (cityResult == null || !cityResult.isSuccess()) { String errorMsg = cityResult == null ? "查詢城市沒有返回結果" : cityResult.getMsg(); // 記錄查詢城市日誌 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg); return Result.ofFail(ErrorEnum.QUERY_ERROR.getCode(), errorMsg); } List<String> cityList = cityResult.getData(); if (CollectionUtils.isEmpty(cityList)) { String errorMsg = "沒有查詢到城市,無需生成路線"; // 記錄查詢城市日誌 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, null, errorMsg); return Result.ofSuccessMsg(errorMsg); } // 記錄查詢城市日誌 logQueryCityByShard(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, JSON.toJSONString(cityList), null); Map<String, Future<Result<String>>> categoryFutureMap = new ConcurrentHashMap<>(); // 禁用的類別列表 List<String> disableCategoryList = null; String disableCategory = genProperties.getDisableCategory(); if (StringUtils.isNotBlank(disableCategory)) { String[] disableCategoryArr = disableCategory.split(","); disableCategoryList = Stream.of(disableCategoryArr).collect(Collectors.toList()); } // 根據類別生成路線 // 每個類別一個線程 for (RouteCategoryEnum routeCategory : RouteCategoryEnum.values()) { // 跳過禁用的類別 if (CollectionUtils.isNotEmpty(disableCategoryList) && disableCategoryList.contains(routeCategory.getType())) { // 記錄生成日誌 logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, routeCategory.getType(), routeCategory.getType() + "類別被禁用,無需生成"); continue; } // 根據城市和類別生成路線 Future<Result<String>> categoryFuture = routeGenService.genRouteByCategory(routeGenDTO, cityList, routeCategory.getType()); categoryFutureMap.put(routeCategory.getType(), categoryFuture); } String lastErrorMsg = null; for (Map.Entry<String, Future<Result<String>>> entry : categoryFutureMap.entrySet()) { String errorMsg = null; String category = entry.getKey(); Future<Result<String>> futureResult = entry.getValue(); if (futureResult != null) { try { if (futureResult.isCancelled()) { errorMsg = "線程被取消"; log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 處理失敗,錯誤描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); continue; } Result<String> result = futureResult.get(CommonConfConstants.FUTURE_CATEGORY_WAIT_TIME, TimeUnit.SECONDS); if (result == null || !result.isSuccess()) { errorMsg = result == null ? category + "沒有返回處理結果" : result.getMsg(); log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 處理失敗,錯誤描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); } else { log.info("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 生成成功", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), result); } } catch (TimeoutException e) { errorMsg = category + "生成路線 線程處理超時:" + e.getMessage(); log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 線程處理超時", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } catch (InterruptedException e) { errorMsg = category + "生成路線 線程中斷異常:" + e.getMessage(); log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 線程中斷異常", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } catch (ExecutionException e) { errorMsg = category + "生成路線 線程執行異常:" + e.getMessage(); log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 線程執行異常", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), e); } } else { errorMsg = category + "生成路線 線程返回結果為null"; log.error("分片項{} 分片總數{} 生成 {} 路線 任務開始時間:{} 處理失敗,錯誤描述:{}", routeGenDTO.getShardItem(), routeGenDTO.getShardTotal(), category, routeGenDTO.getStartTime(), errorMsg); } // 記錄生成日誌 logGenRouteByCategory(RouteJobTypeEnum.ROUTE_GEN, routeGenDTO, category, errorMsg); if (StringUtils.isNotBlank(errorMsg)) { lastErrorMsg = errorMsg; } } if (StringUtils.isNotBlank(lastErrorMsg)) { return Result.ofFail(ErrorEnum.GEN_ROUTE_FAILURE.getCode(), lastErrorMsg); } return Result.ofSuccess("路線生成成功"); }

private void logQueryCityByShard(RouteJobTypeEnum routeJobTypeEnum, RouteGenDTO routeGenDTO, String cityJson, String errorMsg) {
String logFormat;
String logMsg;
if (StringUtils.isBlank(errorMsg)) {
logFormat = "查詢城市成功,城市信息:%s";
logMsg = String.format(logFormat, cityJson);
} else {
logMsg = errorMsg;
}

String operationObject = routeJobTypeEnum.getType() + "_job_"
+ routeGenDTO.getShardItem() + "_"
+ TimeUtil.dateTimeToStr(routeGenDTO.getStartTime(), DateFormatConstants.DATE_TIME_COMPACT_FORMAT);

LogInfoDTO logInfoDTO = new LogInfoDTO();
logInfoDTO.setBusinessName(routeJobTypeEnum.getDesc());
logInfoDTO.setOperationName("根據分片查詢城市");
logInfoDTO.setOperationObject(operationObject);
logInfoDTO.setOperationDesc(logMsg);
LogUtil.log(logInfoDTO);
}
 

[email protected]異步線程池的配置及應用