1. 程式人生 > >Java 使用執行緒池執行大資料量統計任務

Java 使用執行緒池執行大資料量統計任務

2017年08月13日 20:51:14

最近需要對每週生成的日誌表進行處理,並且輸出結果到另一張表。日誌表少的有300萬,多有的有上千萬條記錄。因此打算用多執行緒來處理資料。在使用執行緒池時,幾個注意點:

1、在入口的地方,直接新建一個執行緒為執行,然後返回結果,後續通過日誌表來跟蹤;

2、設定獨立的執行緒名規則,區分自動生成的執行緒名;

3、直接使用ThreadPoolExecutor,而不是借用Executors類生成;

4、利用Future的阻塞特性來控制全部執行緒執行結束的時間點;

5、考慮是否有必要增加中斷執行的機制;

6、考慮能合成批量操作的地方儘量合成批量操作。

程式碼參考:  

        //1.計算執行緒數
        int threadNum = totalCount / StatConstant.SPLIT_NUM;
        if (threadNum * StatConstant.SPLIT_NUM < totalCount) {
            threadNum++;
        }
        //2.發起執行緒
        List<Future<Integer>> futureList = new ArrayList<>();
        ThreadFactory threadFactory = new ThreadFactoryBuilder()
                .setNameFormat("LogHandlerThread-%d")
                .build();
        ExecutorService executorService = new ThreadPoolExecutor(threadNum, threadNum, 0L, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(threadNum), threadFactory);
        for (int i = 0; i < threadNum; i++) {
            int begin = i * StatConstant.SPLIT_NUM;
            int end = (i + 1) * StatConstant.SPLIT_NUM;
            if (i == threadNum - 1) {
                end = totalCount;
            }
            Future<Integer> future = executorService.submit(new LogHandlerThread(begin, end, weekNo, applicationContext));
            futureList.add(future);
        }
        //3.記錄執行緒結果
        boolean finalResult = true;
        for (int i = 0; i < futureList.size(); i++) {
            try {
                Future<Integer> future = futureList.get(i);
                Integer result = future.get();
                handleCount += ((result == null) ? 0 : result);
            } catch (Exception e) {
                weekLog.setMessage(weekLog.getMessage() + "###" + "(ThreadNum=" + i + ")" + e.getMessage());
                finalResult = false;
            }
        }
        executorService.shutdown();
        //4.執行其他任務...
     public class LogHandlerThread implements Callable<Integer> {
        public LogHandlerThread(Integer begin, Integer end, String weekNo, ApplicationContext applicationContext) {
          //初始..
        }
        @Override
        public Integer call() {
          //執行..
        }
     }