1. 程式人生 > >Java ReentranLock同步鎖和Condition條件的使用

Java ReentranLock同步鎖和Condition條件的使用

java.util.concurrent.lock 中的 Lock 框架是鎖定的一個抽象,它允許把鎖定的實現作為 Java 類,而不是作為語言的特性來實現。這就為 Lock 的多種實現留下了空間,各種實現可能有不同的排程演算法、效能特性或者鎖定語義。 ReentrantLock 類實現了 Lock ,它擁有與 synchronized 相同的併發性和記憶體語義,但是添加了類似鎖投票、定時鎖等候和可中斷鎖等候的一些特性。此外,它還提供了在激烈爭用情況下更佳的效能。(換句話說,當許多執行緒都想訪問共享資源時,JVM 可以花更少的時候來排程執行緒,把更多時間用在執行執行緒上。)

ReentranLock與synchronized比較

相同:

  • ReentrantLock提供了synchronized類似的功能和記憶體語義。

不同:

  • ReentrantLock功能性方面更全面,比如時間鎖等候,可中斷鎖等候,鎖投票等,因此更有擴充套件性。在多個條件變數和高度競爭鎖的地方,用ReentrantLock更合適,ReentrantLock還提供了Condition,對執行緒的等待和喚醒等操作更加靈活,一個ReentrantLock可以有多個Condition例項,所以更有擴充套件性。

  • ReentrantLock 的效能比synchronized會好點。

  • ReentrantLock提供了可輪詢的鎖請求,他可以嘗試的去取得鎖,如果取得成功則繼續處理,取得不成功,可以等下次執行的時候處理,所以不容易產生死鎖,而synchronized則一旦進入鎖請求要麼成功,要麼一直阻塞,所以更容易產生死鎖。

ReentranLock的使用

實現可輪詢的鎖請求

在內部鎖中,死鎖是致命的——唯一的恢復方法是重新啟動程式,唯一的預防方法是在構建程式時不要出錯。而可輪詢的鎖獲取模式具有更完善的錯誤恢復機制,可以規避死鎖的發生。

如果你不能獲得所有需要的鎖,那麼使用可輪詢的獲取方式使你能夠重新拿到控制權,它會釋放你已經獲得的這些鎖,然後再重新嘗試。可輪詢的鎖獲取模式,由tryLock()方法實現。此方法僅在呼叫時鎖為空閒狀態才獲取該鎖。如果鎖可用,則獲取鎖,並立即返回值true。如果鎖不可用,則此方法將立即返回值false。此方法的典型使用語句如下:

Lock lock = ...; 
// 嘗試獲取鎖請求,獲取不到鎖時也不會產生死鎖
if (lock.tryLock()) { try { // manipulate protected state } finally { lock.unlock(); } } else { // perform alternative actions }

實現可定時的鎖請求

當使用內部鎖時,一旦開始請求,鎖就不能停止了,所以內部鎖給實現具有時限的活動帶來了風險。為了解決這一問題,可以使用定時鎖。當具有時限的活動呼叫了阻塞方法,定時鎖能夠在時間預算內設定相應的超時。如果活動在期待的時間內沒能獲得結果,定時鎖能使程式提前返回。可定時的鎖獲取模式,由tryLock(long, TimeUnit)方法實現。

實現可中斷的鎖獲取請求

可中斷的鎖獲取操作允許在可取消的活動中使用。lockInterruptibly()方法能夠使你獲得鎖的時候響應中斷。

ReentranLock需要注意的地方

  • lock 必須在 finally 塊中釋放。否則,如果受保護的程式碼將丟擲異常,鎖就有可能永遠得不到釋放!這一點區別看起來可能沒什麼,但是實際上,它極為重要。忘記在 finally 塊中釋放鎖,可能會在程式中留下一個定時炸彈,當有一天炸彈爆炸時,您要花費很大力氣才有找到源頭在哪。而使用同步,JVM 將確保鎖會獲得自動釋放

  • 當 JVM 用 synchronized 管理鎖定請求和釋放時,JVM 在生成執行緒轉儲時能夠包括鎖定資訊。這些對除錯非常有價值,因為它們能標識死鎖或者其他異常行為的來源。 Lock 類只是普通的類,JVM 不知道具體哪個執行緒擁有 Lock 物件。

Condition條件的使用

條件變數很大一個程度上是為了解決Object.wait/notify/notifyAll難以使用的問題。

Condition也稱為條件佇列 或條件變數)為執行緒提供了一個含義,以便在某個狀態條件現在可能為 true 的另一個執行緒通知它之前,一直掛起該執行緒(即讓其“等待”)。

因為訪問此共享狀態資訊發生在不同的執行緒中,所以它必須受保護,因此要將某種形式的鎖與該條件相關聯。等待提供一個條件的主要屬性是:以原子方式 釋放相關的鎖,並掛起當前執行緒,就像 Object.wait 做的那樣。

上述API說明表明條件變數需要與鎖繫結,而且多個Condition需要繫結到同一鎖上。前面的Lock中提到,獲取一個條件變數的方法是Lock.newCondition()。

void await() throws InterruptedException;

void awaitUninterruptibly();

long awaitNanos(long nanosTimeout) throws InterruptedException;

boolean await(long time, TimeUnit unit) throws InterruptedException;

boolean awaitUntil(Date deadline) throws InterruptedException;

void signal();

void signalAll();

以上是Condition介面定義的方法,await()對應於Object.wait,signal對應於Object.notify,signalAll對應於Object.notifyAll。特別說明的是Condition的介面改變名稱就是為了避免與Object中的wait/notify/notifyAll的語義和使用上混淆,因為Condition同樣有wait/notify/notifyAll方法。

每一個Lock可以有任意資料的Condition物件,Condition是與Lock繫結的,所以就有Lock的公平性特性:如果是公平鎖,執行緒為按照FIFO的順序從Condition.await中釋放,如果是非公平鎖,那麼後續的鎖競爭就不保證FIFO順序了。

使用Condition和Lock實現阻塞佇列

public class BlockingQueue<T> {

    private T[] mItems;

    // 同步鎖
    private final Lock mLock = new ReentrantLock();
    // 佇列滿時的條件
    private Condition mNotFull = mLock.newCondition();
    // 佇列為空時條件
    private Condition mNotEmpty = mLock.newCondition();

    private int mCount;
    private int mHeadIdx;
    private int mTailIdx;

    private static final int DEFAULT_QUEUE_SIZE = 10;

    public BlockingQueue() {
        this(DEFAULT_QUEUE_SIZE);
    }

    public BlockingQueue(int size) {
        mItems = (T[]) new Object[size];
    }

    // 放入元素
    public void put(T item) throws InterruptedException {
        // 獲取讀寫鎖
        mLock.lock();
        try {
            // 此處會進行阻塞,直到有元素取出
            while (mCount == getCapacity()) {
                // 掛起,並釋放鎖,條件滿足時,重新獲取鎖
                mNotFull.await();
            }
            mItems[mTailIdx] = item;
            // 填滿元素
            if (++mTailIdx == getCapacity()) {
                mTailIdx = 0;
            }
            ++mCount;
            // 對阻塞住的take()進行喚醒
            mNotEmpty.signalAll();
        } finally {
            mLock.unlock();
        }
    }

    // 取出元素
    public T take() throws InterruptedException {
        mLock.lock();
        try {
            while (mCount == 0) {
                 // 掛起,並釋放鎖,條件滿足時,重新獲取鎖
                mNotEmpty.await();
            }
            T item = mItems[mHeadIdx];
            mItems[mHeadIdx] = null; // for GC
            if (++mHeadIdx == getCapacity()) {
                mHeadIdx = 0;
            }
            --mCount;
            mNotEmpty.signalAll();
            return item;
        } finally {
            mLock.unlock();
        }
    }

    public int getCapacity() {
        return mItems.length;
    }

}

測試

public class Main {

    private static BlockingQueue<String> sQueue;

    public static void main(String[] args) {
        sQueue = new BlockingQueue<>();
        new Thread(() -> {
            try {
                // 阻塞5秒
                Thread.sleep(5000);
                sQueue.put("Hello");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }).start();

        // 從阻塞佇列中取值
        try {
            String item = sQueue.take();
            System.out.println(item);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

await() 操作

上一節中說過多次ReentrantLock是獨佔鎖,一個執行緒拿到鎖後如果不釋放,那麼另外一個執行緒肯定是拿不到鎖,所以在lock.lock()和lock.unlock()之間可能有一次釋放鎖的操作(同樣也必然還有一次獲取鎖的操作)。

我們再回頭看程式碼,不管take()還是put(),在進入lock.lock()後唯一可能釋放鎖的操作就是await()了。也就是說await()操作實際上就是釋放鎖,然後掛起執行緒,一旦條件滿足就被喚醒,再次獲取鎖!


public final void await() throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    Node node = addConditionWaiter();
    int savedState = fullyRelease(node);
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {
        LockSupport.park(this);
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
        interruptMode = REINTERRUPT;
    if (node.nextWaiter != null)
        unlinkCancelledWaiters();
    if (interruptMode != 0)
        reportInterruptAfterWait(interruptMode);
}

上面是await()的程式碼片段。上一節中說過,AQS在獲取鎖的時候需要有一個CHL的FIFO佇列,所以對於一個Condition.await()而言,如果釋放了鎖,要想再一次獲取鎖那麼就需要進入佇列,等待被通知獲取鎖。完整的await()操作是安裝如下步驟進行的:

  1. 將當前執行緒加入Condition鎖佇列。特別說明的是,這裡不同於AQS的佇列,這裡進入的是Condition的FIFO佇列。後面會具體談到此結構。進行2。

  2. 釋放鎖。這裡可以看到將鎖釋放了,否則別的執行緒就無法拿到鎖而發生死鎖。進行3。

  3. 自旋(while)掛起,直到被喚醒或者超時或者CACELLED等。進行4。
    獲取鎖(acquireQueued)。並將自己從Condition的FIFO佇列中釋放,表明自己不再需要鎖(我已經拿到鎖了)。

這裡再回頭介紹Condition的資料結構。我們知道一個Condition可以在多個地方被await*(),那麼就需要一個FIFO的結構將這些Condition串聯起來,然後根據需要喚醒一個或者多個(通常是所有)。所以在Condition內部就需要一個FIFO的佇列。

signal/signalAll 操作

await()清楚了,現在再來看signal/signalAll就容易多了。按照signal/signalAll的需求,就是要將Condition.await()中FIFO佇列中第一個Node喚醒(或者全部Node)喚醒。儘管所有Node可能都被喚醒,但是要知道的是仍然只有一個執行緒能夠拿到鎖,其它沒有拿到鎖的執行緒仍然需要自旋等待,就上上面提到的第4步(acquireQueued)。


private void doSignal(Node first) {
    do {
        if ( (firstWaiter = first.nextWaiter) == null)
            lastWaiter = null;
        first.nextWaiter = null;
    } while (!transferForSignal(first) &&
             (first = firstWaiter) != null);
}

private void doSignalAll(Node first) {
    lastWaiter = firstWaiter  = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);
        first = next;
    } while (first != null);
}

上面的程式碼很容易看出來,signal就是喚醒Condition佇列中的第一個非CANCELLED節點執行緒,而signalAll就是喚醒所有非CANCELLED節點執行緒。當然了遇到CANCELLED執行緒就需要將其從FIFO佇列中剔除。