Java多執行緒CountDownLatch與執行緒池ThreadPool的使用
一. CountDownLatch是什麼
CountDownLatch是一個同步工具類,它允許一個或多個執行緒等待,直到在其他執行緒中執行的一組操作完成。
CountDownLatch是通過一個計數器實現的,計數器的初始值為執行緒的數量,這是一個一次性現象,計數不能重置。如果需要重置計數,可以考慮使用cyclicbarrier。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。
二. CountDownLatch中的主要方法
1. public void CountDownLatch(int count) {…} 構造器中的計數值(count)實際上就是需要等待的執行緒數量。這個值只能被設定一次,而且CountDownLatch沒有提供任何機制去重置這個計數值。與CountDownLatch第一次互動式主執行緒等待其他執行緒,主執行緒必須在啟動其他執行緒後立即呼叫CountDownLatch.await()方法。這樣主執行緒的操作就會在這個方法上阻塞,直到其他執行緒完成各自的任務。
2. await() :導致當前執行緒等待,直到計數器下降到零,除非執行緒被中斷。如果當前計數為零,則此方法立即返回。如果當前計數大於零,則當前執行緒將禁用執行緒排程,並且處於休眠狀態,直到出現兩種情況之一:計數達到零的countdown()方法呼叫;或其他執行緒中斷當前執行緒。
3. countDown():每呼叫一次這個方法,在建構函式中初始化的count值就減1。直到count的值為0,主執行緒就能通過await()方法,恢復執行自己的任務。
三. 執行緒池
Java通過Executors提供四種執行緒池,分別是:
1. newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
2. newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
3. newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
4. newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。
四. 使用場景
利用CountDownLatch和newFixedThreadPool實現excel批量匯入需求。關鍵程式碼:
/**
* Description:
* 執行緒池工具類
* 保證全域性使用一個相同的執行緒池,方便控制執行緒的建立和銷燬
*/
public class ThreadPoolUtil {
private static final ExecutorService pool = Executors.newFixedThreadPool(1000);
public static void submit(Runnable runnable){
pool.submit(runnable);
}
}
/**
* 後臺掛起執行列傳商品以及插入資料庫
*/
Map<String, Object> runAdd(final Map<Integer,String> errorRowMap,final HSSFSheet hssfSheet, final WmsCompany company){
final Map<String, Object> result = new HashMap<>();
result.put("repeatCount", 0);
//儲存商品資料的集合
final Map<String, WmsGoods> goodsMap = new ConcurrentHashMap<>();
//通過檢查的excel行數,即匯入的商品數
final int rowCount = **;
//使新增執行緒等待轉換執行緒執行後執行的工具
final CountDownLatch doWork = new CountDownLatch(rowCount);
try {
for (int i = 2; i <= rowCount; i++) {
final HSSFRow hssfRow = hssfSheet.getRow(i);
if (hssfRow == null){
doWork.countDown();
continue;
}
if( errorRowMap.containsKey(i)){
doWork.countDown();
continue;
}
ThreadPoolUtil.submit(new Runnable() {
@Override
public void run() {
WmsGoods wmsGoods = cellToWmsGoods(hssfRow);
if (wmsGoods != null) {
try{
wmsGoods.setCompid(company.getCompid());
wmsGoods.setStoreid(company.getStoreId().intValue());
wmsGoods.setStorename(company.getStoreName());
wmsGoods.setCreaemp(company.getMobile());
wmsGoods.setCrearq(DateUtil.getStrTime());
wmsGoods.setGoodType("0");//預設普通商品
......
logger.info("公司資訊:"+new Gson().toJson(company).toString());
}catch(Exception e){
e.printStackTrace();
}
}
//遞減鎖存器的計數
doWork.countDown();
}
});
}
//新增執行緒等待轉換線執行
doWork.await();
//執行新增操作
List<WmsGoods> goodsList = new CopyOnWriteArrayList<>();
goodsList.addAll(goodsMap.values());
result.putAll(wmsGoodsService.addGoodsTimer(goodsList));
} catch (Exception e) {
e.printStackTrace();
logger.error("新增商品失敗!");
result.put("state", 1);
result.put("msg", e.getMessage());
}finally {
logger.info("新增商品完成---------------!");
}
return result;
}
五. 遇到的問題
1. 在使用執行緒池批量匯入,幾次遇到了伺服器多次掛了的情況,排查原因,主要是程式的執行緒池未及時回收,導致資源不足,所以採用了一個全域性的執行緒池,固定開啟1000個執行緒。
2. 底層採用的是mybatis,最開始是用的批量更新方法,將每次匯入的資料一次入庫,更新,後來遇到時間過長的問題,所以改成了單條執行插入,更新操作。