1. 程式人生 > >J.U.C併發包(1)

J.U.C併發包(1)

J.U.C併發包(1)

AbstractQueuedSynchronizer

AbstractQueuedSynchronizer是JUC併發包中鎖的底層支援,AbstractQueuedSynchronizer是抽象同步佇列,簡稱AQS,是實現同步器的基礎元件,併發包中鎖的實現底層就是使用AQS實現,

  • 從類圖的關係可以看到AQS是一個FIFO的雙向佇列,內部通過節點head 和 tail 記錄隊首和隊尾元素,佇列元素型別為Node。其中Node中Thread變數用來存放進入AQS佇列裡面的執行緒
  • Node 節點內部SHARED用來標記該執行緒是獲取共享資源時候被阻塞掛起來後放入AQS佇列,
  • EXCLUSIVE標記執行緒是獲取獨佔資源時候被掛起後放入AQS佇列;
  • waitStatus記錄當前執行緒等待狀態,分別為CANCELLED(執行緒被取消了),SIGNAL(執行緒需要被喚醒),CONDITION(執行緒在條件佇列裡面等待),PROPAGATE(釋放共享資源時候需要通知其他節點);
  • AQS中維持了一個單一的狀態資訊state,可以通過getState,setState,compareAndSetState 函式修改其值;對於ReentrantLock 的實現來說,state 可以用來表示當前執行緒獲取鎖的可重入次數;
  • pre記錄當前節點的前驅節點,next記錄當前節點後繼節點
  • 呼叫acquire(int arg)方法獲取獨佔資源,呼叫release(int arg)方法釋放資源;

具體思路:

  • 當多個執行緒同時呼叫 lock.lock() 獲取鎖的時候,同時只有一個執行緒獲取到了該鎖,其他執行緒會被轉換為 Node 節點插入到 lock 鎖對應的 AQS 阻塞佇列裡面,並做自旋 CAS 嘗試獲取鎖,前提是head的直接後繼;
  • 如果獲取到鎖的執行緒又呼叫了對應的條件變數的 await() 方法,則該執行緒會釋放獲取到的鎖,並被轉換為 Node 節點插入到條件變數對應的條件佇列裡面;
  • 這時候因為呼叫 lock.lock() 方法被阻塞到 AQS 佇列裡面的一個執行緒會獲取到被釋放的鎖,如果該執行緒也呼叫了條件變數的 await()方法則該執行緒也會被放入條件變數的條件佇列;
  • 當另外一個執行緒呼叫了條件變數的 signal() 或者 signalAll() 方法時候,會把條件佇列裡面的一個或者全部 Node 節點移動到 AQS 的阻塞佇列裡面,等待時機獲取鎖。

CountDownLatch

他是一個同步輔助類,可以實現類似阻塞當前執行緒的功能,使用了給定的計數器進行初始化,該計數器操作是原子操作,同一時刻只能有一個執行緒操作該計數器。

如上圖中,TA執行緒由於await()方法被阻塞,除非前面的執行緒呼叫countDown()方法,當計數器為0,TA就可以繼續往下執行。計數器不可重置。

        private final static int threadCount = 200;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final CountDownLatch countDownLatch = new CountDownLatch(threadCount);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    test(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await();
        log.info("finish");
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        Thread.sleep(100);
        log.info("{}", threadNum);
        Thread.sleep(100);
    }

Semaphore

他可以控制某個資源可以被多少個執行緒同時訪問,使用Semaphore管理必須要先獲取一個許可,執行完畢後釋放一個許可,後面的執行緒才能繼續訪問,程式碼演示:

@Slf4j
public class SemaphoreExample1 {

    private final static int threadCount = 20;

    public static void main(String[] args) throws Exception {

        ExecutorService exec = Executors.newCachedThreadPool();

        final Semaphore semaphore = new Semaphore(3);

        for (int i = 0; i < threadCount; i++) {
            final int threadNum = i;
            exec.execute(() -> {
                try {
                    semaphore.acquire(); // 獲取一個許可
                    test(threadNum);
                    semaphore.release(); // 釋放一個許可
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        exec.shutdown();
    }

    private static void test(int threadNum) throws Exception {
        log.info("{}", threadNum);
        Thread.sleep(1000);
    }
}

CyclicBarrier

他是一個同步輔助類,允許一組執行緒相互等待,直到到達某個公共的屏障點,commonBarrierPoint,CyclicBarrier也具有一個計數器,當計數器達到設定的值,被await()方法阻塞的值會被喚醒,繼續執行後續的操作,計數器可以被重置,適合併發情況下需要合併計算的場景。

演示程式碼如下:

@Slf4j
public class CyclicBarrierExample1 {
    private static Logger log = LoggerFactory.getLogger(CyclicBarrierExample1.class);

    private static CyclicBarrier barrier = new CyclicBarrier(5);

    public static void main(String[] args) throws Exception {

        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 0; i < 10; i++) {
            final int threadNum = i;
            Thread.sleep(1000);
            executor.execute(() -> {
                try {
                    race(threadNum);
                } catch (Exception e) {
                    log.error("exception", e);
                }
            });
        }
        executor.shutdown();
    }

    private static void race(int threadNum) throws Exception {
        Thread.sleep(1000);
        log.info("{} is ready", threadNum);
        barrier.await();
        log.info("{} continue", threadNum);
    }
}

console:



ReentrantLock

 

ReentrantLock(可重入鎖)和Synchronize鎖的區別

  • 可重入性:二者都是重入鎖
  • 鎖的實現:ReentrantLock通過jdk實現,synchronize是通過jvm實現,但注意ReentrantLock需要釋放鎖,而synchronize不需要釋放鎖,由jvm管理,也就是說synchronize不會產生死鎖,而ReentrantLock可能產生死鎖。
  • 效能的區別:synchronize未做優化前,ReentrantLock優於synchronize,但synchronize在經過偏向鎖,輕量級鎖優化後效能就差不多了
  • 功能區別:程式碼簡潔synchronize優於ReentrantLock,鎖的細粒度和靈活度ReentrantLock更好。

ReentrantLock獨有功能

  • 可指定公平鎖還是非公平鎖:公平鎖(先等待的就先獲取鎖);
  • 提供了condition類,可以分組喚醒需要喚醒的執行緒;
  • 提供鎖的打斷機制,lock.lockInterruptibly()。

ReentrantLock是一種自選鎖,內部迴圈使用CAS操作實現加鎖

ReentrantReadWriteLock

ReentrantReadWriteLock是讀寫鎖,維護了一對鎖,一個讀鎖,一個寫鎖,通過實現ReadWriteLock介面實現了readLock()方法和writeLock()方法。適用於多執行緒情況下的讀寫操作,但是要注意如果讀操作過於頻繁可能會導致寫鎖飢餓。

StampLock

StampLock有三種控制鎖的方式:寫,讀和樂觀讀,StampLock會生成票據。

  • 樂觀讀:樂觀的認為寫入和讀取同時發生的概率很少,因此不悲觀的使用讀取鎖定,程式可以檢視讀取資料時候遭到寫入執行的變更之後,大幅提升程式的效能。
  • 樂觀鎖和悲觀鎖例項如下:
public class LockExample4 {

    class Point {
        private double x, y;
        private final StampedLock sl = new StampedLock();

        void move(double deltaX, double deltaY) { // an exclusively locked method
            long stamp = sl.writeLock();
            try {
                x += deltaX;
                y += deltaY;
            } finally {
                sl.unlockWrite(stamp);
            }
        }

        //下面看看樂觀讀鎖案例
        double distanceFromOrigin() { // A read-only method
            long stamp = sl.tryOptimisticRead(); //獲得一個樂觀讀鎖
            double currentX = x, currentY = y;  //將兩個欄位讀入本地區域性變數
            if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其他寫鎖發生?
                stamp = sl.readLock();  //如果沒有,我們再次獲得一個讀悲觀鎖
                try {
                    currentX = x; // 將兩個欄位讀入本地區域性變數
                    currentY = y; // 將兩個欄位讀入本地區域性變數
                } finally {
                    sl.unlockRead(stamp);
                }
            }
            return Math.sqrt(currentX * currentX + currentY * currentY);
        }

        //下面是悲觀讀鎖案例
        void moveIfAtOrigin(double newX, double newY) { // upgrade
            // Could instead start with optimistic, not read mode
            long stamp = sl.readLock();
            try {
                while (x == 0.0 && y == 0.0) { //迴圈,檢查當前狀態是否符合
                    long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉為寫鎖
                    if (ws != 0L) { //這是確認轉為寫鎖是否成功
                        stamp = ws; //如果成功 替換票據
                        x = newX; //進行狀態改變
                        y = newY;  //進行狀態改變
                        break;
                    } else { //如果不能成功轉換為寫鎖
                        sl.unlockRead(stamp);  //我們顯式釋放讀鎖
                        stamp = sl.writeLock();  //顯式直接進行寫鎖 然後再通過迴圈再試
                    }
                }
            } finally {
                sl.unlock(stamp); //釋放讀鎖或寫鎖
            }
        }
    }
}