一次刪資料而認識的CountDownLatch和CyclicBarrier
公司之前有個任務,要求刪除一張資料庫表裡面2018/2/1之前的資料。這張表裡面存放的是車輛定位資料,一輛車每天能產生4000+條定位資料,所以整個表蠻大的,有65億+條資料。而且還有要求:根據每個地區要統計出來這個地區刪除了多少條資料。其中2月1號之前的有10億多條。當然這是刪除完之後才統計出來的。
一開始是這樣做的:查詢某個地區,時間在2018/2/1之前的資料,用Mongo遊標hasNext()遍歷,當遍歷出的結果有500條時,用執行緒池刪除。大概程式碼是這樣的:
while (mongoCursor.hasNext()) { i++; list.add(mongoCursor.next()); //批量刪除500條資料 if (i >= 500) { pool.execute(new DeleteRunnable(list)); i=0; list = new ArrayList<>(); } }
程式執行時發現:mongoCursor.hasNext()方法時常"卡死",導致後面的語句無法執行,但又不丟擲異常,導致一天刪不了多少資料。原因到現在未知...
但是後面改成了這個方法也有問題:主執行緒每次查詢1w條資料,分給執行緒池去做刪除操作,執行緒每次刪除500條資料。這種方法的問題是:主執行緒第1次查詢出A資料後,執行緒池還沒來得及刪除,主執行緒第2次查詢又把A資料查詢出來了,交給執行緒池做刪除操作。程式執行後發現:1w條資料,大約有一半資料已經不存在,導致刪除失敗...
查詢1w條資料大概需要70s左右(好慢,而且還是根據索引查詢...),刪除500條資料大約需要10s左右(根據主鍵_id刪除),所以我改成了每次查詢1w條資料後,休眠10s...情況好多了...
前面講了這麼多還是為了引出CountDownLatch和CyclicBarrier,CountDownLatch是後面公司的同事告訴我的。他的提議是,用sleep休眠也只能粗略地估算刪除的時間,主執行緒查詢資料後,可以用CountDownLatch的await方法阻塞主執行緒,等待執行緒池中的執行緒都執行了刪除方法後,再重新查詢資料庫。大概程式碼:
//findDatas為查找出來的1w條資料 int count = findDatas.size() % 500 == 0 ? (findDatas.size() / 500) : (findDatas.size() / 500 + 1); CountDownLatch countDownLatch = new CountDownLatch(count); List<String> deleteIDs = new ArrayList<>(); for (int i = 0; i < findDatas.size(); i++) { deleteIDs.add(findDatas.get(i)); //批量刪除500條資料 if (deleteIDs.size() >= 500 || i == findDatas.size() - 1) { pool.execute(new DeleteRunnable(deleteIDs, countDownLatch)); deleteIDs = new ArrayList<>(); } } System.out.println("任務分配完成,主執行緒等待任務完成"); try { //主執行緒阻塞等待 countDownLatch.await(); continue; } catch (InterruptedException e) { e.printStackTrace(); }
後來去網上找了一下CountDownLatch這個類,發現很多大佬的部落格都是把它和CyclicBarrier放在一起講,那我也就拜讀了幾篇大佬的文章,自己做下總結。
CountDownLatch
public CountDownLatch(int count):構造方法,宣告有count把鎖。
public void countDown():鎖的個數減一。
public void await():此方法會阻塞當前執行緒,直到鎖的個數減為0。
public boolean await(long timeout, TimeUnit unit):此方法會阻塞當前執行緒,直到鎖的個數減為0 或者 超時。(對比Future類,此方法超時不會丟擲異常)
public long getCount():當前還剩多少把鎖。(返回值 >=0,不會出現為負數的情況)
CyclicBarrier
public CyclicBarrier(int parties):構造方法,宣告CyclicBarrier屏障要攔截的數量parties。
public CyclicBarrier(int parties, Runnable barrierAction):構造方法,宣告屏障要攔截的數量parties,並且在攔截完parties數量後,執行barrierAction方法。
public int getParties():返回障要攔截的數量,也就是構造方法中的parties的值。
public int await():此方法會阻塞當前執行緒,直到到達屏障的數量為parties。
public int await(long timeout, TimeUnit unit):此方法會阻塞當前執行緒,直到到達屏障的數量為parties 或者 超時。(超時會丟擲TimeoutException,導致其他執行緒呼叫await方法會丟擲BrokenBarrierException)
public boolean isBroken():當前屏障是否處於broken狀態。
public int getNumberWaiting():返回已經到達屏障被阻塞的數量。
public void reset():重置屏障為初始狀態,如果有執行緒正在等待障礙,會丟擲BrokenBarrierException。處於broken狀態的屏障重置會很複雜,所以如果當前屏障處於broken狀態,建議建立一個新的CyclicBarrier物件會更好。
CyclicBarrier和CountDownLatch對比
當然是相比於CountDownLatch只能用一次,CyclicBarrier能複用啦....
CyclicBarrierTest類中,進行了兩次比賽,用的都是同一個CyclicBarrier物件,程式碼:
public class MyRunnable1 implements Runnable {
CyclicBarrier cyclicBarrier;
public MyRunnable1(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + String.format(":我前面有%s人已經準備好了。", cyclicBarrier.getNumberWaiting()));
try {
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
public class CyclicBarrierTest {
public static void main(String[] args) throws InterruptedException {
CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
@Override
public void run() {
System.out.println("所有人到齊,比賽開始.....");
System.out.println("恭喜rng...");
}
});
System.out.println(String.format("五排需要幾個人?\t答:%s人", cyclicBarrier.getParties()));
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
new Thread(new MyRunnable1(cyclicBarrier)).start();
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished...");
System.out.println("\n=============第二次比賽開始...===================\n");
for (int i = 0; i < 5; i++) {
TimeUnit.SECONDS.sleep(1);
new Thread(new MyRunnable1(cyclicBarrier)).start();
}
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finished...");
}
}
結果: