1. 程式人生 > >Java多執行緒CountDownLatch與執行緒池ThreadPool的使用

Java多執行緒CountDownLatch與執行緒池ThreadPool的使用

一. CountDownLatch是什麼
    CountDownLatch是一個同步工具類,它允許一個或多個執行緒等待,直到在其他執行緒中執行的一組操作完成。
    CountDownLatch是通過一個計數器實現的,計數器的初始值為執行緒的數量,這是一個一次性現象,計數不能重置。如果需要重置計數,可以考慮使用cyclicbarrier。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。

二. CountDownLatch中的主要方法
    1. public void CountDownLatch(int count) {…} 構造器中的計數值(count)實際上就是需要等待的執行緒數量。這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重置這個計數值。與CountDownLatch第一次互動式主執行緒等待其他執行緒,主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。
    2. await() :導致當前執行緒等待,直到計數器下降到零,除非執行緒被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,則當前執行緒將禁用執行緒排程,並且處於休眠狀態,直到出現兩種情況之一:計數達到零的countdown()方法呼叫;或其他執行緒中斷當前執行緒。
    3. countDown():每呼叫一次這個方法,在建構函式中初始化的count值就減1。直到count的值為0,主執行緒就能通過await()方法,恢復執行自己的任務。

三. 執行緒池
    Java通過Executors提供四種執行緒池,分別是:
    1. newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
    2. newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
    3. newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
    4. newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

四. 使用場景
    利用CountDownLatch和newFixedThreadPool實現excel批量匯入需求。關鍵程式碼:

/**
 * Description:
 * 執行緒池工具類
 * 保證全域性使用一個相同的執行緒池,方便控制執行緒的建立和銷燬
 */
public class ThreadPoolUtil {

    private static final ExecutorService pool = Executors.newFixedThreadPool(1000);

    public static void submit(Runnable runnable){
        pool.submit(runnable);
    }
}
/**
* 後臺掛起執行列傳商品以及插入資料庫
*/
Map<String, Object> runAdd(final Map<Integer,String> errorRowMap,final HSSFSheet hssfSheet, final WmsCompany company){
   final Map<String, Object> result = new HashMap<>();
   result.put("repeatCount", 0);

   //儲存商品資料的集合
   final Map<String, WmsGoods> goodsMap = new ConcurrentHashMap<>();

   //通過檢查的excel行數,即匯入的商品數
   final int rowCount = **;

   //使新增執行緒等待轉換執行緒執行後執行的工具
   final CountDownLatch doWork = new CountDownLatch(rowCount);

   try {
       for (int i = 2; i <= rowCount; i++) {
           final HSSFRow hssfRow = hssfSheet.getRow(i);

           if (hssfRow == null){
               doWork.countDown();
               continue;
           }

           if( errorRowMap.containsKey(i)){
               doWork.countDown();
               continue;
           }

ThreadPoolUtil.submit(new Runnable() {
     @Override
     public void run() {
        WmsGoods wmsGoods = cellToWmsGoods(hssfRow);

          if (wmsGoods != null) {

            try{
               wmsGoods.setCompid(company.getCompid());
               wmsGoods.setStoreid(company.getStoreId().intValue());
               wmsGoods.setStorename(company.getStoreName());
               wmsGoods.setCreaemp(company.getMobile());
               wmsGoods.setCrearq(DateUtil.getStrTime());
               wmsGoods.setGoodType("0");//預設普通商品
            ......
 logger.info("公司資訊:"+new Gson().toJson(company).toString());

               }catch(Exception e){
                  e.printStackTrace();
               }
         }
             //遞減鎖存器的計數
             doWork.countDown();
       }
    });

 }

      //新增執行緒等待轉換線執行
      doWork.await();
       //執行新增操作
       List<WmsGoods> goodsList = new CopyOnWriteArrayList<>();
       goodsList.addAll(goodsMap.values());

       result.putAll(wmsGoodsService.addGoodsTimer(goodsList));
   } catch (Exception e) {
       e.printStackTrace();

       logger.error("新增商品失敗!");
       result.put("state", 1);
       result.put("msg", e.getMessage());
   }finally {
       logger.info("新增商品完成---------------!");
   }
   return result;
}

五. 遇到的問題

    1. 在使用執行緒池批量匯入,幾次遇到了伺服器多次掛了的情況,排查原因,主要是程式的執行緒池未及時回收,導致資源不足,所以採用了一個全域性的執行緒池,固定開啟1000個執行緒。

    2. 底層採用的是mybatis,最開始是用的批量更新方法,將每次匯入的資料一次入庫,更新,後來遇到時間過長的問題,所以改成了單條執行插入,更新操作。