1. 程式人生 > >CountDownLatch 閉鎖、Semaphore信號量、Barrier柵欄

CountDownLatch 閉鎖、Semaphore信號量、Barrier柵欄

阻塞 dem leg ror try rsync and 參與 time()

  同步工具類可以是任何一個對象。阻塞隊列可以作為同步工具類,其他類型的同步工具類還包括信號量(Semaphore)、柵欄(Barrier)、以及閉鎖(Latch)。

  所有的同步工具類都包含一些特定的結構化屬性:它們封裝了一些狀態,這些狀態將決定執行同步工具類的線程是繼續執行還是等待,此外還提供了一些方法對狀態進行操作,以及另一些方法用於高效地等待同步工具類進入到預期狀態。

1.閉鎖

  閉鎖是一種同步工具類,可以延遲線程進度直到其到達終止狀態。閉鎖的作用相當於一扇門:在閉鎖到達結束狀態之前,這扇門一直是關閉的,並且沒有任何線程能通過,當到達結束狀態時允許所有的線程通過。當閉鎖到達結束狀態後,將不會再改變狀態,因此這扇門將永遠打開。閉鎖可以用來確保某些活動直到其他活動都完成才繼續執行。

  CountDownLatch是一種靈活的閉鎖實現,它可以使一個或多個線程等待一組線程。閉鎖狀態包括一個計數器,該計數器被初始化為一個正數,表示需要等待的事件數量。countDown遞減計數器,表示一個事件已經發生,而await方法等待計數器達到零,這表示所有需要等待的事件都已經發生。如果計數器的值非零,那麽await會一直阻塞直到計數器為零,或者等待中的線程中斷,或者等待超時。

查看源碼發現:我們傳進去的參數相當於內部Sync的狀態,每次調用countDown的時候將狀態值減一,狀態值為0表示結束狀態(await會解除阻塞)

    public CountDownLatch(int
count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); } public void countDown() { sync.releaseShared(1); } public void await() throws InterruptedException { sync.acquireSharedInterruptibly(1); }

查看sync的源碼:

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }
    ...
}

例如:實現一個統計多個線程並發執行任務的用時功能:

  當線程執行run中代碼的時候會阻塞到startLatch.await(); 直到主線程調用startLatch.countDown(); 將計數器減一。這時所有線程開始執行任務。

  當線程執行完的時候endLatch.countDown();將結束必鎖的計數器減一,此時主線程阻塞在endLatch.await();,直到5個線程都執行完主線程也解除阻塞。

package cn.qlq.thread.tone;

import java.util.concurrent.CountDownLatch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo4 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo4.class);

    public static void main(String[] args) throws InterruptedException {
        final CountDownLatch startLatch = new CountDownLatch(1);
        final CountDownLatch endLatch = new CountDownLatch(5);
        for (int i = 0; i < 5; i++) {
            Thread.sleep(1 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        startLatch.await();// 起始閉鎖的計數器阻塞等到計數器減到零(標記第一個線程開始執行)
                        Thread.sleep(1 * 1000);
                        endLatch.countDown();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }).start();
        }

        // 實現計時
        long startTime = System.nanoTime();
        startLatch.countDown();// 將起始閉鎖的計數器減一
        endLatch.await();// 結束閉鎖阻塞直到計數器為零
        long endTime = System.nanoTime();
        LOGGER.error("結束,用時{}", endTime - startTime);
    }
}

2.Semaphore 信號量

  計數信號量(counting Semaphore)用來控制同時訪問某個資源的數量,或者同時執行某個操作的數量。計數信號量還可以實現某種資源池,或者對容器實施邊界。

  信號量是1個的Semaphore意味著只能被1個線程占用,可以用來設計同步(相當於互斥鎖)。信號量大於1的Semaphore可以用來設計控制並發數,或者設計有界容器。

  Semaphore中管理者一組虛擬的許可(permit),許可的初始數量可由構造函數指定。在執行操作時首先獲得許可(只要還有剩余的許可),並在使用後釋放。如果沒有許可,那麽acquire將會一直阻塞直到有許可(或者直到中斷或者操作超時)。release方法將返回一個許可給信號量。計算信號量的一種簡化形式是二值信號量,即初始值為1的Semaphore。二值信號量可以用作互斥體(mutex),並具備不可重入的加鎖語義:誰擁有了這個唯一的許可誰就擁有了互斥鎖。

例如:例如信號量構造一個有界阻塞容器:

  信號量的計數值初始化為容器的最大值。add操作在向底層容器添加一個元素之前,首先要獲取一個許可。如果add沒有添加任何元素,那麽會立刻釋放信號量。同樣,remove操作釋放一個許可,使更多的元素能加到容器中。

class BoundedHashSet<T> {
    private Set<T> set;
    private Semaphore semaphore;

    public BoundedHashSet(int bound) {
        set = Collections.synchronizedSet(new HashSet());
        semaphore = new Semaphore(bound);
    }

    public boolean add(T o) throws InterruptedException {
        semaphore.acquire();// 嘗試獲取信號量
        boolean wasAdded = false;
        try {
            wasAdded = set.add(o);
            return wasAdded;
        } finally {
            if (!wasAdded) {// 如果添加失敗就釋放信號量,添加成功就占用一個信號量
                semaphore.release();
            }
        }
    }

    public boolean remove(T o) throws InterruptedException {
        boolean remove = set.remove(o);
        if (remove)// 如果刪除成功之後就釋放一個信號量
            semaphore.release();
        return remove;
    }
}

測試代碼:

        BoundedHashSet<String> boundedHashSet = new BoundedHashSet<String>(3);
        System.out.println(boundedHashSet.add("1"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("2"));
        System.out.println(boundedHashSet.add("3"));
        System.out.println(boundedHashSet.add("4"));// 將會一直阻塞到這裏
        System.out.println("=========");

結果:(JVM不會關閉)

技術分享圖片

註意:

1.Semaphore可以指定公平鎖還是非公平鎖,默認是非公平鎖

    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

2.acquire方法和release方法是可以有參數的,表示獲取/返還的信號量個數

    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

3. Barrier柵欄

  柵欄(Barrier)類似於閉鎖(一種同步工具,可以延遲線程直到其達到其終止狀態),它能阻塞一組線程直到某個事件發生。柵欄與閉鎖的區別在於所有線程必須同時到達柵欄位置,才能繼續執行。閉鎖等於等待事件,而柵欄用於等待其他線程。柵欄可以用於實現一些協議,例如幾個家庭成員決定在某個地方集合:"所有人6:00到達目的地,然後討論下一步的事情"。

3.1 CyclicBarrier柵欄(循環屏障)

  CyclicBarrier可以使一定數量的參與方反復地在柵欄位置匯集,它在並行叠代算法中非常有用:這種算法通常將一個問題劃分成一系列相互獨立的子問題。當線程到達柵欄位置時將調用await方法,這個方法將阻塞到所有線程到達柵欄位置。如果所有線程都到達柵欄,那麽柵欄將打開所有線程被釋放,而柵欄將被重置以便下次使用。如果對await的調用超時,或者await阻塞的線程被中斷,那麽柵欄就被認為是打破了,所有阻塞的await調用都將終止並拋出BrokenBarrierException。如果成功的通過柵欄,那麽await將為每個線程返回一個唯一的到達索引號,我們可以用這些索引號"選舉"產生一個領導線程,並在下一次叠代中由該領導線程執行一些特殊的工作。CyclicBarrier還可以使你將一個柵欄操作傳遞給構造函數,這是一個Runnable,當成功的通過柵欄時會(在一個子線程)執行它,但在阻塞過程被釋放之前是不能執行的。

  CyclicBarrier的構造方法可以傳入參與的數量(也就是被柵欄攔截的線程的數量),也可以傳入一個Runnable對象。

    public CyclicBarrier(int parties) {
        this(parties, null);
    }
    public CyclicBarrier(int parties, Runnable barrierAction) {
        if (parties <= 0) throw new IllegalArgumentException();
        this.parties = parties;
        this.count = parties;
        this.barrierCommand = barrierAction;
    }

例如:

package cn.qlq.thread.tone;

import java.util.concurrent.CyclicBarrier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo2 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo2.class);

    public static void main(String[] args) throws InterruptedException {
        final CyclicBarrier cyclicBarrier = new CyclicBarrier(2);
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        cyclicBarrier.await();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

結果:

18:08:00 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-1
18:08:02 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-0
18:08:04 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-3
18:08:06 [cn.qlq.thread.tone.Demo2]-[INFO] threadName -> Thread-2

  00的時候0線程到達柵欄進入阻塞,02的時候1線程到達柵欄,由於柵欄的參與者是2所以此時相當於所有線程到達柵欄,柵欄放開,然後柵欄被重置。

  04的時候2線程到達柵欄進入阻塞,06的時候3線程到達柵欄,由於柵欄的參與者是2所以此時相當於所有參與者線程到達柵欄,然後柵欄放開。

我們將柵欄的參與者改為5查看結果:

final CyclicBarrier cyclicBarrier = new CyclicBarrier(5);

結果:4個線程會阻塞到await方法處,而且JVM不會關閉,因為柵欄的參與者不夠5個所以被一直阻塞。

技術分享圖片

3.2 Exchanger

  Exchanger相當於一個兩方(Two-party)柵欄,各方在柵欄位置上交換數據。當兩方執行不對稱的操作時,Exchanger非常有用。例如:一個線程向緩沖區寫東西,另一個線程從緩沖區讀數據Exchanger相當於參與者只有兩個的CyclicBarrier。

  兩個線程會阻塞在exchanger.exchange方法上,泛型可以指定其交換的數據類型。

例如:兩個線程交換自己的線程名稱

package cn.qlq.thread.tone;

import java.util.concurrent.Exchanger;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * @author Administrator
 *
 */
public class Demo3 {
    private static final Logger LOGGER = LoggerFactory.getLogger(Demo3.class);

    public static void main(String[] args) throws InterruptedException {
        final Exchanger<String> exchanger = new Exchanger<String>();// 泛型指定交換的數據
        for (int i = 0; i < 4; i++) {
            Thread.sleep(2 * 1000);
            new Thread(new Runnable() {
                @Override
                public void run() {
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                    try {
                        String exchange = exchanger.exchange(Thread.currentThread().getName());
                        LOGGER.error("threadName -> {},exchange->{}", Thread.currentThread().getName(), exchange);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    LOGGER.info("threadName -> {}", Thread.currentThread().getName());
                }
            }).start();
        }
    }
}

結果:

18:28:33 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-1,exchange->Thread-0
18:28:35 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-0,exchange->Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-1
18:28:35 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-0
18:28:37 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-3,exchange->Thread-2
18:28:39 [cn.qlq.thread.tone.Demo3]-[ERROR] threadName -> Thread-2,exchange->Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-3
18:28:39 [cn.qlq.thread.tone.Demo3]-[INFO] threadName -> Thread-2

CountDownLatch 閉鎖、Semaphore信號量、Barrier柵欄