1. 程式人生 > >java併發包詳解(jdk7)

java併發包詳解(jdk7)

在此對java併發包做一個大致總結,如有錯誤,請指正。
juc包的總體結構大致如下
這裡寫圖片描述
外層框架主要有Lock(ReentrantLock、ReadWriteLock等)、同步器(semaphores等)、阻塞佇列(BlockingQueue等)、Executor(執行緒池)、併發容器(ConcurrentHashMap等)、還有Fork/Join框架;
內層有AQS(AbstractQueuedSynchronizer類,鎖功能都由他實現)、非阻塞資料結構、原子變數類(AtomicInteger等無鎖執行緒安全類)三種。
底層就實現是volatile和CAS。整個併發包其實都是由這兩種思想構成的。

volatile

理解volatile特性的一個好方法是把對volatile變數的單個讀/寫,看成是使用同一個鎖對這
些單個讀/寫操作做了同步。一個volatile變數的單個讀/寫操作,與一個普通變數的讀/寫操作都
是使用同一個鎖來同步,它們之間的執行效果相同。
volatile具有的特性。
可見性。對一個volatile變數的讀,總是能看到(任意執行緒)對這個volatile變數最後的寫
入。
原子性:對任意單個volatile變數的讀/寫具有原子性,但類似於volatile++這種複合操作不
具有原子性。

volatile讀的記憶體語義如下。
當讀一個volatile變數時,JMM會把該執行緒對應的本地記憶體置為無效。執行緒接下來將從主
記憶體中讀取共享變數。
volatile寫的記憶體語義如下。
當寫一個volatile變數時,JMM會把該執行緒對應的本地記憶體中的共享變數值重新整理到主內
存。

為了實現volatile的記憶體語義,編譯器在生成位元組碼時,會在指令序列中插入記憶體屏障來
禁止特定型別的處理器重排序。對於編譯器來說,發現一個最優佈置來最小化插入屏障的總
數幾乎不可能。為此,JMM採取保守策略。下面是基於保守策略的JMM記憶體屏障插入策略。
·在每個volatile寫操作的前面插入一個StoreStore屏障。
·在每個volatile寫操作的後面插入一個StoreLoad屏障。
·在每個volatile讀操作的後面插入一個LoadLoad屏障。
·在每個volatile讀操作的後面插入一個LoadStore屏障。

上面記憶體屏障的簡單意思就是:StoreStore屏障,禁止上面的寫操作和下面的volatile寫重排序;StoreLoad屏障,禁止上面的寫操作和下面的volatile讀重排序;LoadLoad屏障,禁止上面的讀/寫操作和下面的volatile讀操作重排序;LoadStore屏障,禁止上面的讀操作和下面的volatile寫操作重排序。

由於Java的CAS同時具有volatile讀和volatile寫的記憶體語義,因此Java執行緒之間的通訊現
在有了下面4種方式。
1)A執行緒寫volatile變數,隨後B執行緒讀這個volatile變數。
2)A執行緒寫volatile變數,隨後B執行緒用CAS更新這個volatile變數。
3)A執行緒用CAS更新一個volatile變數,隨後B執行緒用CAS更新這個volatile變數。
4)A執行緒用CAS更新一個volatile變數,隨後B執行緒讀這個volatile變數。

總結一下適合使用volatile變數的使用條件(必須滿足所有條件):
1、對變數的寫操作不依賴變數的當前值,或者你能確保只有單執行緒更新變數的值。(簡單來說就是單執行緒寫,多執行緒讀的場景)
2、該變數不會與其他狀態變數一起納入不變性條件中。
3、在訪問變數時不需要加鎖。(如果要加鎖的話用普通變數就行了,沒必要用volatile了)

鎖的實現

鎖的釋放和獲取的記憶體語義
當執行緒釋放鎖時,JMM會把該執行緒對應的本地記憶體中的共享變數重新整理到主記憶體中。
當執行緒獲取鎖時,JMM會把該執行緒對應的本地記憶體置為無效。從而使得被監視器保護的
臨界區程式碼必須從主記憶體中讀取共享變數。
總結一下就是:
執行緒A釋放一個鎖,實質上是執行緒A向接下來將要獲取這個鎖的某個執行緒發出了(執行緒A
對共享變數所做修改的)訊息。
·執行緒B獲取一個鎖,實質上是執行緒B接收了之前某個執行緒發出的(在釋放這個鎖之前對共
享變數所做修改的)訊息。
·執行緒A釋放鎖,隨後執行緒B獲取這個鎖,這個過程實質上是執行緒A通過主記憶體向執行緒B發
送訊息。
這裡藉助ReentrantLock的原始碼,來分析鎖記憶體語義的具體實現機制。
在ReentrantLock中,呼叫lock()方法獲取鎖;呼叫unlock()方法釋放鎖。

public void lock() {
        sync.lock();
    }
public void unlock() {
        sync.release(1);
    }

它的鎖由靜態內部類Sync實現,分為公平鎖和非公平鎖。

abstract static class Sync extends AbstractQueuedSynchronizer 
static final class NonfairSync extends Sync  //非公平鎖
static final class FairSync extends Sync    //公平鎖

鎖的實現都依賴於Java同步器框架AbstractQueuedSynchronizer(AQS),AQS使用一個整型的volatile變數(命名為state)來維護同步狀態,它是ReentrantLock記憶體語義實現的關鍵。

/**
     * The synchronization state.
     */
    private volatile int state;

拿非公平鎖的實現舉例

final void lock() {
//AQS內方法,採用CAS實現監視器鎖,如果state為0,則獲得鎖,並設為1
            if (compareAndSetState(0, 1))
                //獲取鎖之後設定擁有鎖的執行緒為當前執行緒
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //如果鎖已被獲取,則加入等待佇列
                acquire(1);
        }

protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

非公平鎖在釋放鎖的最後寫volatile變數state,在獲取鎖時首先讀這個volatile變數。根據
volatile的happens-before規則,釋放鎖的執行緒在寫volatile變數之前可見的共享變數,在獲取鎖
的執行緒讀取同一個volatile變數後將立即變得對獲取鎖的執行緒可見。
compareAndSetState(0, 1)採用的CAS操作。JDK文件對該方法的說明如下:如果當前狀態值等於預期值,則以原子方式將同步狀態設定為給定的更新值。
可能會疑惑為什麼這個方法可以完成原子操作,原因是此操作具有volatile讀和寫的記憶體語義。主要是由sun.misc.Unsafe類實現的,它是native方法,這裡不做深入了。

AQS

分析一下同步器(AQS)的原理
AQS框架採用了模板模式,例如在獨佔模式的獲取和釋放,再拿ReentrantLock的非公平鎖NonfairSync舉例

/** 獨佔獲取 */
final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    //AQS中並未實現tryAcquire()方法,需在子類實現
    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }
    //NonfairSync中實現
    protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
/** 獨佔釋放 */
public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    //AQS中未實現
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }
    //Sync類中實現
    protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

從原始碼來看,同步狀態的維護、獲取、釋放動作是由子類實現的,而後續動作入執行緒的阻塞、喚醒機制等則由AQS框架實現。
AQS中使用LockSupport.park() 和 LockSupport.unpark() 的本地方法實現,實現執行緒的阻塞和喚醒。

private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);  //喚醒執行緒
    }

AQS內部維護了一個佇列,AQS將阻塞執行緒封裝成內部類Node的物件,並維護到這個佇列中。

private transient volatile Node head;   //頭結點
    private transient volatile Node tail;   //尾節點

這個佇列是非阻塞的 FIFO佇列,即插入移除節點的時候是非阻塞的,所以AQS內部採用CAS的方式保證節點插入和移除的原子性。

/**
     * CAS head field. Used only by enq.
     */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /**
     * CAS tail field. Used only by enq.
     */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

Node類原始碼如下

static final class Node {
        /** 標記是共享模式*/    
        static final Node SHARED = new Node();
        /** 標記是獨佔模式*/    
        static final Node EXCLUSIVE = null;
        /** 代表執行緒已經被取消*/    
        static final int CANCELLED =  1;
        /** 代表後續節點需要喚醒 */   
        static final int SIGNAL    = -1;
        /** 代表執行緒在等待某一條件/  
        static final int CONDITION = -2;

        static final int PROPAGATE = -3;
        volatile int waitStatus;
        volatile Node prev;
        volatile Node next;
        volatile Thread thread;
        /***連線到等待condition的下一個節點  */ 
        Node nextWaiter;
        final boolean isShared() {
            return nextWaiter == SHARED;
        }
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

獨佔式同步狀態獲取流程,也就是acquire(int arg)方法呼叫流程,如圖
這裡寫圖片描述
由圖所知,前驅節點為頭節點且能夠獲取同步狀態的判斷條件和執行緒進入等待狀態是獲
取同步狀態的自旋過程。當同步狀態獲取成功之後,當前執行緒從acquire(int arg)方法返回,如果
對於鎖這種併發元件而言,代表著當前執行緒獲取了鎖。
當前執行緒獲取同步狀態並執行了相應邏輯之後,就需要釋放同步狀態,使得後續節點能
夠繼續獲取同步狀態。通過呼叫同步器的release(int arg)方法可以釋放同步狀態,該方法在釋
放了同步狀態之後,會喚醒其後繼節點(進而使後繼節點重新嘗試獲取同步狀態)。

public final boolean release(int arg) {
//嘗試釋放鎖,將擁有鎖的執行緒設為null,把state設為0
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

分析了獨佔式同步狀態獲取和釋放過程後,適當做個總結:在獲取同步狀態時,同步器維護一個同步佇列,獲取狀態失敗的執行緒都會被加入到佇列中並在佇列中進行自旋;移出佇列(或停止自旋)的條件是前驅節點為頭節點且成功獲取了同步狀態。在釋放同步狀態時,同步器呼叫tryRelease(int arg)方法釋放同步狀態,然後喚醒頭節點的後繼節點。

原子變數類

原子變數類指的是java.util.concurrent.atomic包下的類。
整個包下面的類實現原理都接近,就是利用volatile和CAS來實現。
拿AtomicInteger舉例:

private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;

    static {
      try {
        valueOffset = unsafe.objectFieldOffset
            (AtomicInteger.class.getDeclaredField("value"));
      } catch (Exception ex) { throw new Error(ex); }
    }

    private volatile int value;

AtomicInteger類擁有一個unsafe物件,這是實現CAS的關鍵,private volatile int value就是它當前的值,用volatile來保證記憶體可見性。

public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }

這個方法完成CAS,如果value==expect,則把值設為update。這是核心方法,其他方法實現基本都用到它
例如

//獲取舊值設定新值
public final int getAndSet(int newValue) {
        for (;;) {
            int current = get();
            if (compareAndSet(current, newValue))
                return current;
        }
    }
    //獲取舊值並加1
    public final int getAndIncrement() {
        for (;;) {
            int current = get();
            int next = current + 1;
            if (compareAndSet(current, next))
                return current;
        }
    }

等方法。