1. 程式人生 > >java併發-讀寫鎖&Park&Condition介面

java併發-讀寫鎖&Park&Condition介面

ReentrantReadWriteLock

核心

讀狀態取state的高16位,寫狀態取state的低16位,來解決一個state需要標識read和write的狀態。
寫Lock 排他、獨佔式的
讀Lock 共享式的

示例

/**
 * describe:
 * E-mail:[email protected]  date:2018/12/16
 *
 * @Since 0.0.1
 */
public class ReentrantReadWriteLockTest {

    static ReentrantReadWriteLock reentrantReadWriteLock =
new ReentrantReadWriteLock(); static Lock readLock = reentrantReadWriteLock.readLock(); static Lock writeLock = reentrantReadWriteLock.writeLock(); static class R extends Thread { @Override public void run() { readLock.lock(); System.out.println(
"讀"+Thread.currentThread().getName()); readLock.unlock(); } } static class W extends Thread { @Override public void run() { writeLock.lock(); System.out.println("寫"+Thread.currentThread().getName()); writeLock.unlock();
} } public static void main(String[] args) { for (int i = 0; i < 10; i++) { if (i%2 == 0){ Thread r = new R(); r.start(); }else { Thread w = new W(); w.start(); } } } }

降級鎖(同一執行緒,資料的更改是可見的)

含義就是在寫lock的時候,可以允許去獲取讀鎖,此時沒有做降級的時候,讀鎖是“阻塞”的,但是做了降級後,讀鎖可以被獲取到,
讀鎖釋放,那麼該執行緒就從寫鎖執行緒變成了讀鎖執行緒了。

    public void processData() {
        readLock.lock();
        if (!update) {
            // 必須先釋放讀鎖
            readLock.unlock();
            // 鎖降級從寫鎖獲取到開始
            writeLock.lock();
            try {
                if (!update) {
                // 準備資料的流程(略)
                update = true;
            }
                readLock.lock();
            } finally {
                 writeLock.unlock();
            }
             // 鎖降級完成,寫鎖降級為讀鎖
        }
        try {
            // 使用資料的流程(略)
         } finally {
             readLock.unlock();
         }
    }

LockSupport類

  • 呼叫Unsafe的park(停車)方法

LockSupport定義了一組以park開頭的方法用來阻塞當前執行緒,以及unpark(Thread thread)
方法來喚醒一個被阻塞的執行緒
相當於wait和notify

Condition

在這裡插入圖片描述

Condition 操作的前提是,當前執行緒是同步佇列中的head,獲取到了鎖,才能進行等待/喚醒操作

  • Condition 維護了一個Node等待佇列

在這裡插入圖片描述

  • await操作

會將該執行緒從Syn佇列中移動到Condition等待佇列的隊尾
在這裡插入圖片描述

  • single操作

會將該執行緒從Condition中移動到Syn的同步佇列的隊尾
在這裡插入圖片描述

Condition示例

此例中實現了偽消費者,是排他鎖的實現
當陣列長度滿的時候,remove將被喚醒,清空陣列
當陣列容量小於長度時,remove將阻塞,add方法被喚醒

注意
  • 這裡要注意喚醒後的執行順序,是邏輯順序往下執行,所以必須要double check,此處喚醒是喚醒了所有等待的執行緒,有可能出現生產者喚醒生產者的情況
  • 疑問
    add()方法已經lock到鎖了,為什麼await後,remove可以獲取鎖,在add沒有unlock之前是無法獲取鎖的???

原因:
await()–>fullyRelease()–>release()–>tryRelease
release方法如下

public final boolean release(long arg) {
       if (tryRelease(arg)) {
           Node h = head;
           if (h != null && h.waitStatus != 0)
               unparkSuccessor(h);
           return true;
       }
       return false;
   }

在single的時候,或有enqueue操作,我們的lock方法在無法獲取鎖的情況下就會enqueue操作,進入同步佇列
so it’s ok…

/**
 * describe:
 * E-mail:[email protected]  date:2018/12/16
 *
 * @Since 0.0.1
 */
public class ConditionWaitAndNotify {
    //排它鎖
    private Lock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();

    private Object[] ts;

    private int index;

    public ConditionWaitAndNotify(int count) {
        if (count <= 0) throw new IllegalArgumentException("count must not be null");
        ts = new Object[count];
    }

    public void add() {
        try {
            lock.lock();
            //one check
            if (index == ts.length) {
                condition.await();
            }
            //double check
            if (index == ts.length) {
                return;
            }
            ts[index] = new Object();
            index++;
            System.out.println("add" + index);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void remove() {
        try {
            lock.lock();
            //one check
            if (index < ts.length) {
                condition.await();
            }
            //double check
            if (index < ts.length){
                return;
            }
            index = 0;
            System.out.println("remove" + index);
            condition.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    static Thread createAdd(ConditionWaitAndNotify conditionWaitAndNotify){
        return new Thread(() -> {
            while (true){
                conditionWaitAndNotify.add();
                try {
                    //Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
    static Thread createRemove(ConditionWaitAndNotify conditionWaitAndNotify){
        return new Thread(() -> {
            while (true) {
                conditionWaitAndNotify.remove();
                try {
                    //Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }


    public static void main(String[] args) {
        ConditionWaitAndNotify conditionWaitAndNotify = new ConditionWaitAndNotify(3);
        Thread t1 = createAdd(conditionWaitAndNotify);
        Thread t11 = createAdd(conditionWaitAndNotify);
        Thread t2 = createRemove(conditionWaitAndNotify);
        Thread t22 = createRemove(conditionWaitAndNotify);
        t1.start();
        t11.start();
        t2.start();
        t22.start();
    }
}

GitHub主頁