1. 程式人生 > >一次刪資料而認識的CountDownLatch和CyclicBarrier

一次刪資料而認識的CountDownLatch和CyclicBarrier

公司之前有個任務,要求刪除一張資料庫表裡面2018/2/1之前的資料。這張表裡面存放的是車輛定位資料,一輛車每天能產生4000+條定位資料,所以整個表蠻大的,有65億+條資料。而且還有要求:根據每個地區要統計出來這個地區刪除了多少條資料。其中2月1號之前的有10億多條。當然這是刪除完之後才統計出來的。

一開始是這樣做的:查詢某個地區,時間在2018/2/1之前的資料,用Mongo遊標hasNext()遍歷,當遍歷出的結果有500條時,用執行緒池刪除。大概程式碼是這樣的:

        while (mongoCursor.hasNext()) {
         i++;
            list.add(mongoCursor.next());
            //批量刪除500條資料
            if (i >= 500) {
                pool.execute(new DeleteRunnable(list));
                i=0;
                list = new ArrayList<>();
            }
        }

程式執行時發現:mongoCursor.hasNext()方法時常"卡死",導致後面的語句無法執行,但又不丟擲異常,導致一天刪不了多少資料。原因到現在未知...

但是後面改成了這個方法也有問題:主執行緒每次查詢1w條資料,分給執行緒池去做刪除操作,執行緒每次刪除500條資料。這種方法的問題是:主執行緒第1次查詢出A資料後,執行緒池還沒來得及刪除,主執行緒第2次查詢又把A資料查詢出來了,交給執行緒池做刪除操作。程式執行後發現:1w條資料,大約有一半資料已經不存在,導致刪除失敗...

查詢1w條資料大概需要70s左右(好慢,而且還是根據索引查詢...),刪除500條資料大約需要10s左右(根據主鍵_id刪除),所以我改成了每次查詢1w條資料後,休眠10s...情況好多了...

前面講了這麼多還是為了引出CountDownLatch和CyclicBarrier,CountDownLatch是後面公司的同事告訴我的。他的提議是,用sleep休眠也只能粗略地估算刪除的時間,主執行緒查詢資料後,可以用CountDownLatch的await方法阻塞主執行緒,等待執行緒池中的執行緒都執行了刪除方法後,再重新查詢資料庫。大概程式碼:

        //findDatas為查找出來的1w條資料
        int count = findDatas.size() % 500 == 0 ? (findDatas.size() / 500) : (findDatas.size() / 500 + 1);
        CountDownLatch countDownLatch = new CountDownLatch(count);
        List<String> deleteIDs = new ArrayList<>();
        for (int i = 0; i < findDatas.size(); i++) {
            deleteIDs.add(findDatas.get(i));

            //批量刪除500條資料
            if (deleteIDs.size() >= 500 || i == findDatas.size() - 1) {
                pool.execute(new DeleteRunnable(deleteIDs, countDownLatch));
                deleteIDs = new ArrayList<>();
            }
        }

        System.out.println("任務分配完成,主執行緒等待任務完成");
        try {
            //主執行緒阻塞等待
            countDownLatch.await();
            continue;
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

後來去網上找了一下CountDownLatch這個類,發現很多大佬的部落格都是把它和CyclicBarrier放在一起講,那我也就拜讀了幾篇大佬的文章,自己做下總結。

CountDownLatch

public CountDownLatch(int count):構造方法,宣告有count把鎖。

public void countDown():鎖的個數減一。

public void await():此方法會阻塞當前執行緒,直到鎖的個數減為0。

public boolean await(long timeout, TimeUnit unit):此方法會阻塞當前執行緒,直到鎖的個數減為0 或者 超時。(對比Future類,此方法超時不會丟擲異常)

public long getCount():當前還剩多少把鎖。(返回值 >=0,不會出現為負數的情況)

CyclicBarrier

public CyclicBarrier(int parties):構造方法,宣告CyclicBarrier屏障要攔截的數量parties。

public CyclicBarrier(int parties, Runnable barrierAction):構造方法,宣告屏障要攔截的數量parties,並且在攔截完parties數量後,執行barrierAction方法。

public int getParties():返回障要攔截的數量,也就是構造方法中的parties的值。

public int await():此方法會阻塞當前執行緒,直到到達屏障的數量為parties。

public int await(long timeout, TimeUnit unit):此方法會阻塞當前執行緒,直到到達屏障的數量為parties 或者 超時。(超時會丟擲TimeoutException,導致其他執行緒呼叫await方法會丟擲BrokenBarrierException)

public boolean isBroken():當前屏障是否處於broken狀態。

public int getNumberWaiting():返回已經到達屏障被阻塞的數量。

public void reset():重置屏障為初始狀態,如果有執行緒正在等待障礙,會丟擲BrokenBarrierException。處於broken狀態的屏障重置會很複雜,所以如果當前屏障處於broken狀態,建議建立一個新的CyclicBarrier物件會更好。

CyclicBarrier和CountDownLatch對比

當然是相比於CountDownLatch只能用一次,CyclicBarrier能複用啦....

CyclicBarrierTest類中,進行了兩次比賽,用的都是同一個CyclicBarrier物件,程式碼:

public class MyRunnable1 implements Runnable {
    CyclicBarrier cyclicBarrier;

    public MyRunnable1(CyclicBarrier cyclicBarrier) {
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println(Thread.currentThread().getName() + String.format(":我前面有%s人已經準備好了。", cyclicBarrier.getNumberWaiting()));
        try {
            cyclicBarrier.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
public class CyclicBarrierTest {
    public static void main(String[] args) throws InterruptedException {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人到齊,比賽開始.....");
                System.out.println("恭喜rng...");
            }
        });

        System.out.println(String.format("五排需要幾個人?\t答:%s人", cyclicBarrier.getParties()));

        for (int i = 0; i < 5; i++) {
            TimeUnit.SECONDS.sleep(1);
            new Thread(new MyRunnable1(cyclicBarrier)).start();
        }

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("finished...");

        System.out.println("\n=============第二次比賽開始...===================\n");
        for (int i = 0; i < 5; i++) {
            TimeUnit.SECONDS.sleep(1);
            new Thread(new MyRunnable1(cyclicBarrier)).start();
        }

        try {
            TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("finished...");

    }
}

結果: