1. 程式人生 > >高併發第十一彈:J.U.C -AQS(AbstractQueuedSynchronizer) 元件:Lock,ReentrantLock,ReentrantReadWriteLock,StampedLock

高併發第十一彈:J.U.C -AQS(AbstractQueuedSynchronizer) 元件:Lock,ReentrantLock,ReentrantReadWriteLock,StampedLock

既然說到J.U.C 的AQS(AbstractQueuedSynchronizer)   不說 Lock 是不可能的.不過實話來說,一般 JKD8 以後我一般都不用Lock了.畢竟sychronized 的效率已經很高了.Lock在我的實際開發中的需求很少,但還是需要了解一下的.

JAVA的兩種鎖

ReentrantLock與synchronized的區別

  可重入性:兩者的鎖都是可重入的,差別不大,有執行緒進入鎖,計數器自增1,等下降為0時才可以釋放鎖

  鎖的實現:synchronized是基於JVM實現的(使用者很難見到,無法瞭解其實現),ReentrantLock是JDK實現的,其實就是我們敲程式碼實現的。

  效能區別:在最初的時候,二者的效能差別差很多,當synchronized引入了偏向鎖、輕量級鎖(自選鎖)後,二者的效能差別不大,官方推薦synchronized(寫法更容易、在優化時其實是借用了ReentrantLock的CAS技術,試圖在使用者態就把問題解決,避免進入核心態造成執行緒阻塞)

功能區別:

  (1)便利性:synchronized更便利,它是由編譯器保證加鎖與釋放。ReentrantLock是需要手動釋放鎖,所以為了避免忘記手工釋放鎖造成死鎖,所以最好在finally中宣告釋放鎖。

  (2)鎖的細粒度和靈活度,ReentrantLock優於synchronized 

ReentrantLock獨有的功能

  可以指定是公平鎖還是非公平鎖,sync只能是非公平鎖。(所謂公平鎖就是先等待的執行緒先獲得鎖)

  提供了一個Condition類,可以分組喚醒需要喚醒的執行緒。不像是synchronized要麼隨機喚醒一個執行緒,要麼全部喚醒。

  提供能夠中斷等待鎖的執行緒的機制,通過lock.lockInterruptibly()實現,這種機制 ReentrantLock是一種自選鎖,通過迴圈呼叫CAS操作來實現加鎖。效能比較好的原因是避免了進入核心態的阻塞狀態。

建議(純個人粗見):

  除非需要用Lock的3個獨有的功能,為了安全和省心一點還是用synchronized吧.最後會有一個 Lock和synchronized和Atomic的效能對比.也可以作為參考

那還是迴歸主題

怎麼使用ReentrantLock呢

  構造方法

建立一個 ReentrantLock的例項。

根據給定的公平政策建立一個 ReentrantLock的例項。 

//建立鎖
private final static Lock lock = new ReentrantLock();
//使用鎖
private static void method() {
    lock.lock();
    try {
       .......
    } finally {
        lock.unlock();
    }
}

基本方法

  • void lock()  獲得鎖
  • boolean 只有在呼叫時它不被另一個執行緒佔用才能獲取鎖。 
  • boolean 如果在給定的等待時間內沒有被另一個執行緒 佔用 ,並且當前執行緒尚未被 保留,則獲取該鎖( interrupted) 。 
  • 嘗試釋放此鎖。 

基本使用 上面也有了   看一下Condition的使用

Condition的使用

Condition因素出Object監視器方法( waitnotifynotifyAll )成不同的物件,以得到具有多個等待集的每個物件,通過將它們與使用任意的組合的效果Lock實現。 Lock替換synchronized方法和語句的使用, Condition取代了物件監視器方法的使用。

條件(也稱為條件佇列條件變數 )為一個執行緒暫停執行(“等待”)提供了一種方法,直到另一個執行緒通知某些狀態現在可能為真。 因為訪問此共享狀態資訊發生在不同的執行緒中,所以它必須被保護,因此某種形式的鎖與該條件相關聯。 等待條件的關鍵屬性是它原子地釋放相關的鎖並掛起當前執行緒,就像Object.wait

一個Condition例項本質上繫結到一個鎖。 要獲得特定Condition例項的Condition例項,請使用其newCondition()方法。

例如,假設我們有一個有限的緩衝區,它支援puttake方法。 如果在一個空的緩衝區嘗試一個take ,則執行緒將阻塞直到一個專案可用; 如果put試圖在一個完整的緩衝區,那麼執行緒將阻塞,直到空間變得可用。 我們希望在單獨的等待集中等待put執行緒和take執行緒,以便我們可以在緩衝區中的專案或空間可用的時候使用僅通知單個執行緒的優化。 這可以使用兩個Condition例項來實現。

class BoundedBuffer {
   final Lock lock = new ReentrantLock();
   final Condition notFull  = lock.newCondition(); 
   final Condition notEmpty = lock.newCondition(); 

   final Object[] items = new Object[100];
   int putptr, takeptr, count;

   public void put(Object x) throws InterruptedException {
     lock.lock(); try {
       while (count == items.length)
         notFull.await();
       items[putptr] = x;
       if (++putptr == items.length) putptr = 0;
       ++count;
       notEmpty.signal();
     } finally { lock.unlock(); }
   }

   public Object take() throws InterruptedException {
     lock.lock(); try {
       while (count == 0)
         notEmpty.await();
       Object x = items[takeptr];
       if (++takeptr == items.length) takeptr = 0;
       --count;
       notFull.signal();
       return x;
     } finally { lock.unlock(); }
   }
 } 

下面是一個更明晰的例子

public static void main(String[] args) {
    ReentrantLock reentrantLock = new ReentrantLock();
    Condition condition = reentrantLock.newCondition();//建立condition
    //執行緒1
    new Thread(() -> {
        try {
            reentrantLock.lock();
            log.info("wait signal"); // 1
            condition.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        log.info("get signal"); // 4
        reentrantLock.unlock();
    }).start();
    //執行緒2
    new Thread(() -> {
        reentrantLock.lock();
        log.info("get lock"); // 2
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        condition.signalAll();//傳送訊號
        log.info("send signal"); // 3
        reentrantLock.unlock();
    }).start();
}

輸出過程講解:

1、執行緒1呼叫了reentrantLock.lock(),執行緒進入AQS等待佇列,輸出1號log

2、接著呼叫了awiat方法,執行緒從AQS佇列中移除,鎖釋放,直接加入condition的等待佇列中

3、執行緒2因為執行緒1釋放了鎖,拿到了鎖,輸出2號log

4、執行緒2執行condition.signalAll()傳送訊號,輸出3號log

5、condition佇列中執行緒1的節點接收到訊號,從condition佇列中拿出來放入到了AQS的等待佇列,這時執行緒1並沒有被喚醒。

6、執行緒2呼叫unlock釋放鎖,因為AQS佇列中只有執行緒1,因此AQS釋放鎖按照從頭到尾的順序,喚醒執行緒1 7、執行緒1繼續執行,輸出4號log,並進行unlock操作。

ReentrantReadWriteLock的使用

在沒有任何讀寫鎖的時候才可以取得寫入鎖(悲觀讀取,容易寫執行緒飢餓),也就是說如果一直存在讀操作,那麼寫鎖一直在等待沒有讀的情況出現,這樣我的寫鎖就永遠也獲取不到,就會造成等待獲取寫鎖的執行緒飢餓。 平時使用的場景並不多。 

private final Map<String, Data> map = new TreeMap<>();
    private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    private final Lock readLock = lock.readLock();//讀鎖
    private final Lock writeLock = lock.writeLock();//寫鎖

    //加讀鎖
    public Data get(String key) {
        readLock.lock();
        try {
            return map.get(key);
        } finally {
            readLock.unlock();
        }
    }
    //加寫鎖
    public Data put(String key, Data value) {
        writeLock.lock();
        try {
            return map.put(key, value);
        } finally {
            writeLock.unlock();
        }
    }

StampedLock

介紹: 一種基於能力的鎖,具有三種模式用於控制讀/寫訪問。 StampedLock的狀態由版本和模式組成。 鎖定採集方法返回一個表示和控制相對於鎖定狀態的訪問的印記; 這些方法的“嘗試”版本可能會返回特殊值為零以表示獲取訪問失敗。 鎖定釋放和轉換方法要求郵票作為引數,如果它們與鎖的狀態不匹配則失敗。 這三種模式是:
  • 寫作。 方法writeLock()可能阻止等待獨佔訪問,返回可以在方法unlockWrite(long)中使用的郵票來釋放鎖定。 不定時的和定時版本tryWriteLock ,還提供。 當鎖保持寫入模式時,不能獲得讀取鎖定,並且所有樂觀讀取驗證都將失敗。
  • 讀。 方法readLock()可能阻止等待非獨佔訪問,返回可用於方法unlockRead(long)釋放鎖的戳記 。 不定時的和定時版本tryReadLock ,還提供。
  • 樂觀閱讀 方法tryOptimisticRead()只有當鎖當前未保持在寫入模式時才返回非零標記。 方法validate(long)返回true,如果在獲取給定的郵票時尚未在寫入模式中獲取鎖定。 這種模式可以被認為是一個非常弱的版本的讀鎖,可以隨時由作家打破。 對簡單的只讀程式碼段使用樂觀模式通常會減少爭用並提高吞吐量。 然而,其使用本質上是脆弱的。 樂觀閱讀部分只能讀取欄位並將其儲存在區域性變數中,以供後驗證使用。 以樂觀模式讀取的欄位可能會非常不一致,因此只有在熟悉資料表示以檢查一致性和/或重複呼叫方法validate()時,使用情況才適用。 例如,當首次讀取物件或陣列引用,然後訪問其欄位,元素或方法之一時,通常需要這樣的步驟。

JDK上提供的例子

class Point {
   private double x, y;
   private final StampedLock sl = new StampedLock();
   void move(double deltaX, double deltaY) { // an exclusively locked method
     long stamp = sl.writeLock();
     try {
       x += deltaX;
       y += deltaY;
     } finally {
       sl.unlockWrite(stamp);
     }
   }
  //下面看看樂觀讀鎖案例
   double distanceFromOrigin() { // A read-only method
     long stamp = sl.tryOptimisticRead(); //獲得一個樂觀讀鎖
     double currentX = x, currentY = y; //將兩個欄位讀入本地區域性變數
     if (!sl.validate(stamp)) { //檢查發出樂觀讀鎖後同時是否有其他寫鎖發生?
        stamp = sl.readLock(); //如果沒有,我們再次獲得一個讀悲觀鎖
        try {
          currentX = x; // 將兩個欄位讀入本地區域性變數
          currentY = y; // 將兩個欄位讀入本地區域性變數
        } finally {
           sl.unlockRead(stamp);
        }
     }
     return Math.sqrt(currentX * currentX + currentY * currentY);
   }
//下面是悲觀讀鎖案例
   void moveIfAtOrigin(double newX, double newY) { // upgrade
     // Could instead start with optimistic, not read mode
     long stamp = sl.readLock();
     try {
       while (x == 0.0 && y == 0.0) { //迴圈,檢查當前狀態是否符合
         long ws = sl.tryConvertToWriteLock(stamp); //將讀鎖轉為寫鎖
         if (ws != 0L) { //這是確認轉為寫鎖是否成功
           stamp = ws; //如果成功 替換票據
           x = newX; //進行狀態改變
           y = newY; //進行狀態改變
           break;
         }
         else { //如果不能成功轉換為寫鎖
           sl.unlockRead(stamp); //我們顯式釋放讀鎖
           stamp = sl.writeLock(); //顯式直接進行寫鎖 然後再通過迴圈再試
         }
       }
     } finally {
       sl.unlock(stamp); //釋放讀鎖或寫鎖
     }
   }
 }
  1. synchronized是在JVM層面上實現的,不但可以通過一些監控工具監控synchronized的鎖定,而且在程式碼執行時出現異常,JVM會自動釋放鎖定;
  2. ReentrantLock、ReentrantReadWriteLock,、StampedLock都是物件層面的鎖定,要保證鎖定一定會被釋放,就必須將unLock()放到finally{}中;
  3. StampedLock 對吞吐量有巨大的改進,特別是在讀執行緒越來越多的場景下;
  4. StampedLock有一個複雜的API,對於加鎖操作,很容易誤用其他方法;
  5. 當只有少量競爭者的時候,synchronized是一個很好的通用的鎖實現;
  6. 當執行緒增長能夠預估,ReentrantLock是一個很好的通用的鎖實現;

StampedLock 可以說是Lock的一個很好的補充,吞吐量以及效能上的提升足以打動很多人了,但並不是說要替代之前Lock的東西,畢竟他還是有些應用場景的,起碼API比StampedLock容易入手.

鎖的選擇

1、當只有少量競爭者,使用synchronized

2、競爭者不少但是執行緒增長的趨勢是能預估的,使用ReetrantLock

3、synchronized不會造成死鎖,jvm會自動釋放死鎖。 

下面有一個例子 關閉 synchronized ,reentrantLock,Atomic的效能對比

package com.rong.juc;

import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 
 * @ClassName: ReentrantLockDemo
 * @Description:TODO(這裡用一句話描述這個類的作用)
 * @author: rongbo
 * @date: 2018年9月23日 下午10:57:27
 * 
 *
 */
public class ReentrantLockTest {
    public static void test(int round, int threadNum, CyclicBarrier cyclicBarrier) {
        new SyncTest("Sync", round, threadNum, cyclicBarrier).testTime();
        new LockTest("Lock", round, threadNum, cyclicBarrier).testTime();
        new AtomicTest("Atom", round, threadNum, cyclicBarrier).testTime();
    }

    public static void main(String args[]) {

        for (int i = 0; i < 5; i++) {
            int round = 10000 * (i + 1);
            int threadNum = 5 * (i + 1);
            CyclicBarrier cb = new CyclicBarrier(threadNum * 2 + 1);
            System.out.println("==========================");
            System.out.println("round:" + round + " thread:" + threadNum);
            test(round, threadNum, cb);

        }
    }
}

class SyncTest extends TestTemplate {
    public SyncTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    @Override
    /**
     * synchronized關鍵字不在方法簽名裡面,所以不涉及過載問題
     */
    synchronized long getValue() {
        return super.countValue;
    }

    @Override
    synchronized void sumValue() {
        super.countValue += preInit[index++ % round];
    }
}

class LockTest extends TestTemplate {
    ReentrantLock lock = new ReentrantLock();

    public LockTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    /**
     * synchronized關鍵字不在方法簽名裡面,所以不涉及過載問題
     */
    @Override
    long getValue() {
        try {
            lock.lock();
            return super.countValue;
        } finally {
            lock.unlock();
        }
    }

    @Override
    void sumValue() {
        try {
            lock.lock();
            super.countValue += preInit[index++ % round];
        } finally {
            lock.unlock();
        }
    }
}

class AtomicTest extends TestTemplate {
    public AtomicTest(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        super(_id, _round, _threadNum, _cb);
    }

    @Override
    /**
     * synchronized關鍵字不在方法簽名裡面,所以不涉及過載問題
     */
    long getValue() {
        return super.countValueAtmoic.get();
    }

    @Override
    void sumValue() {
        super.countValueAtmoic.addAndGet(super.preInit[indexAtomic.get() % round]);
    }
}

abstract class TestTemplate {
    private String id;
    protected int round;
    private int threadNum;
    protected long countValue;
    protected AtomicLong countValueAtmoic = new AtomicLong(0);
    protected int[] preInit;
    protected int index;
    protected AtomicInteger indexAtomic = new AtomicInteger(0);
    Random r = new Random(47);
    // 任務柵欄,同批任務,先到達wait的任務掛起,一直等到全部任務到達制定的wait地點後,才能全部喚醒,繼續執行
    private CyclicBarrier cb;

    public TestTemplate(String _id, int _round, int _threadNum, CyclicBarrier _cb) {
        this.id = _id;
        this.round = _round;
        this.threadNum = _threadNum;
        cb = _cb;
        preInit = new int[round];
        for (int i = 0; i < preInit.length; i++) {
            preInit[i] = r.nextInt(100);
        }
    }

    abstract void sumValue();

    /*
     * 對long的操作是非原子的,原子操作只針對32位 long是64位,底層操作的時候分2個32位讀寫,因此不是執行緒安全
     */
    abstract long getValue();

    public void testTime() {
        ExecutorService se = Executors.newCachedThreadPool();
        long start = System.nanoTime();
        // 同時開啟2*ThreadNum個數的讀寫執行緒
        for (int i = 0; i < threadNum; i++) {
            se.execute(new Runnable() {
                public void run() {
                    for (int i = 0; i < round; i++) {
                        sumValue();
                    }

                    // 每個執行緒執行完同步方法後就等待
                    try {
                        cb.await();
                    } catch (InterruptedException | BrokenBarrierException e) {
                        e.printStackTrace();
                    }

                }
            });
            se.execute(new Runnable() {
                public void run() {

                    getValue();
                    try {
                        // 每個執行緒執行完同步方法後就等待
                        cb.await();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }

                }
            });
        }

        try {
            // 當前統計執行緒也wait,所以CyclicBarrier的初始值是threadNum*2+1
            cb.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
        // 所有執行緒執行完成之後,才會跑到這一步
        long duration = System.nanoTime() - start;
        System.out.println(id + " = " + duration);

    }

}
View Code

老電腦 ,效能很差

結果:

==========================round:10000 thread:5Sync = 4578771Lock = 6079408Atom = 2358938==========================round:20000 thread:10Sync = 10253723Lock = 6668266Atom = 3977932==========================round:30000 thread:15Sync = 19530498Lock = 13254122Atom = 11142416==========================round:40000 thread:20Sync = 31596091Lock = 24663350Atom = 18504161==========================round:50000 thread:25Sync = 55158877Lock = 36521455Atom = 32352693

StampedLock 和ReentrantLock的對比

就剛剛 個例子 自己試一下吧..還是需要自動動手後才 記得住的 Q_Q