1. 程式人生 > >原始碼分析AQS獨佔鎖、共享鎖和Condition的實現

原始碼分析AQS獨佔鎖、共享鎖和Condition的實現

AbstractQueuedSynchronizerjava.util.concurrent包下非常重要和基礎的類,concurrent包下提供了一系列的同步操作需要的工具,包括了ReentrantLockReentrantReadWriteLockThreadPoolExecutorCountDownLatchLinkedBlockingQueue等等,該包下很多的工具類都直接或者間接基於AQS提供的獨佔鎖、共享鎖和等待佇列實現了各自的同步需求。

一、AQS的設計:①AQS將資源抽象成一個int型的值:int state,通過對state的修改,達到對AQS不同狀態的控制,對state變數的修改由其子類實現;②並不是state>0才代表有資源可以申請,如ReentrantLock

中只有state==0的時候才獲取成功,state>0代表資源被佔用,什麼情況代表有資源可用是子類根據state的值做的一個抽象而已;③AQS的兩個方法:acquire(int arg)、release(int arg),分別用來獲取和釋放一定量的資源,即增大和減小state的值。當執行緒執行上述兩個方法時,AQS的子類嘗試修改state的值;④在acquire()方法中:若state大小符合特定需求(具體邏輯由子類實現),則執行緒會鎖定同步器,否則將當前執行緒加入到同步佇列中;在release()方法中:若state大小符合特定需求,則釋放掉當前執行緒佔有的資源,喚醒同步佇列中的執行緒。

二、通過ReentrantLock

的原始碼分析獨佔鎖:ReentrantLock中對state的設計為:state==0的時候代表沒有執行緒持有同步器,在state>0的時候,其它執行緒是不能獲取同步器的,必須加入到同步佇列中等待。 1、從lock()方法開始:這裡只分析NonfairSync

static final class NonfairSync extends Sync {
		//從這個方法開始申請鎖
        final void lock() {
        	//當前state的值為 0,執行緒直接獲取到鎖,setExclusiveOwnerThread()方法可以設定當前持有鎖的執行緒
            if
(compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else //說明已經有執行緒持有了鎖,則acquire()申請資源,acquire()函式的作用就是: //首先呼叫tryAcquire(),嘗試獲取資源,在此的具體實現就是nonfairTryAcquire()方法 //獲取資源失敗則將執行緒加入到同步佇列中等待 acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } abstract static class Sync extends AbstractQueuedSynchronizer { abstract void lock(); //嘗試獲取資源的地方 final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //首先還是判斷是否有執行緒已經持有鎖(c=0),還未有執行緒持有鎖則讓該執行緒持有鎖,設定state的值為acquires,返回true if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //已經有執行緒持有鎖,判斷持有鎖的執行緒和申請鎖的執行緒是否是同一個執行緒,如果是,讓state值增加acquires,返回true //也就是同一個執行緒可以重複獲取到同步器,從這裡可以看出為什麼ReentrantLock鎖是可重入的了 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //不是上面兩種情況,即獲取資源失敗,返回false return false; //判斷獲取到鎖的是不是當前執行緒 protected final boolean isHeldExclusively() { return getExclusiveOwnerThread() == Thread.currentThread(); } final ConditionObject newCondition() { //獲取Condition例項,ConditionObject的定義在AQS中 return new ConditionObject(); } final Thread getOwner() { //獲取當前持有鎖的執行緒,state值為 0,代表還未有執行緒持有鎖 return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } //state不為0,就代表有執行緒持有鎖了 final boolean isLocked() { return getState() != 0; } }

2、Node類:是同步佇列中節點的描述,用來儲存等待的執行緒、狀態、前驅和後繼節點等的資訊。

static final class Node {
        //標識共享模式的節點,共享模式下Node節點的nextWaiter變數設定為這個值
        static final Node SHARED = new Node();
        //標識獨佔模式的節點,獨佔模式nextWaiter變數是null
        static final Node EXCLUSIVE = null;
        //同步佇列中被取消的節點,被中斷或者等待超時,該狀態的節點不再被使用
        static final int CANCELLED =  1;
        //標識後繼節點處於被喚醒的狀態,當節點釋放同步器後,會喚醒後繼節點中第一個處於該狀態的節點
        static final int SIGNAL    = -1;
        //描述處於等待佇列中的節點,節點中的執行緒等待在Condition上,
        //在呼叫signal()之後,被喚醒的節點將從等待佇列中轉移到同步佇列中繼續等待
        static final int CONDITION = -2;
        //確保共享模式下可以喚醒後續的共享節點
        static final int PROPAGATE = -3;
        //儲存節點狀態,值為 0、CANCELLED、SIGNAL、CONDITION、PROPAGATE      
        volatile int waitStatus;
        
        //連結串列的上一個節點
        volatile Node prev;

        //連結串列的下一個節點
        volatile Node next;

        //儲存被阻塞的執行緒
        volatile Thread thread;

        //用來儲存某個Condition上的等待佇列,也用來判斷節點是否是共享節點
        Node nextWaiter;

        //判斷節點是否是共享模式,通過判斷nextWaiter==SHARED
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        //找到前驅節點
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {}

        Node(Node nextWaiter) {
            this.nextWaiter = nextWaiter;
            U.putObject(this, THREAD, Thread.currentThread());
        }

        Node(int waitStatus) {
            U.putInt(this, WAITSTATUS, waitStatus);
            U.putObject(this, THREAD, Thread.currentThread());
        }

        /** CAS設定節點的狀態 */
        final boolean compareAndSetWaitStatus(int expect, int update) {
            return U.compareAndSwapInt(this, WAITSTATUS, expect, update);
        }

        /** CAS設定後繼節點 */
        final boolean compareAndSetNext(Node expect, Node update) {
            return U.compareAndSwapObject(this, NEXT, expect, update);
        }
		//Unsafe 是CAS的工具類,CAS是虛擬機器實現的原子性操作
        private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
        private static final long NEXT;
        static final long PREV;
        private static final long THREAD;
        private static final long WAITSTATUS;
        static {
            try {
            	//分別拿到Node節點的next、prev、thread、waitStatus變數的控制代碼,CAS通過控制代碼修改這些變數
                NEXT = U.objectFieldOffset
                    (Node.class.getDeclaredField("next"));
                PREV = U.objectFieldOffset
                    (Node.class.getDeclaredField("prev"));
                THREAD = U.objectFieldOffset
                    (Node.class.getDeclaredField("thread"));
                WAITSTATUS = U.objectFieldOffset
                    (Node.class.getDeclaredField("waitStatus"));
            } catch (ReflectiveOperationException e) {
                throw new Error(e);
            }
        }
    }

3、AQS的acquire(int arg)方法:從NonfairSync 的實現看出,申請鎖(資源)的時候,首先會執行acquire()方法。 ①acquire()方法,嘗試獲取資源,失敗將執行緒加入同步佇列中

   public final void acquire(int arg) {
	//tryAcquire()嘗試獲取資源,返回false,執行acquireQueued()方法,將執行緒加入同步佇列中,讓執行緒進入等待狀態
	//addWaiter()這裡傳入的是獨佔標誌(Node.EXCLUSIVE),說明該節點是一個獨佔節點
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

addWaiter()方法,將當前執行緒封裝進Node節點,並將節點加入到同步佇列中,如果頭結點為null,則初始化頭結點和尾節點,並將當前節點連結在尾節點之後。

 private Node addWaiter(Node mode) {
    	//初始化一個Node節點,變數nextWaiter為mode
        Node node = new Node(mode);
        for (;;) {
            Node oldTail = tail;
            if (oldTail != null) {
            	//將當前節點設定成尾節點,連結在之前的尾節點之後
                U.putObject(node, Node.PREV, oldTail);
                if (compareAndSetTail(oldTail, node)) {
                    oldTail.next = node;
                    return node;
                }
            } else {
            	//初始化頭節點和尾節點指向同一個節點
                initializeSyncQueue();
            }
        }
    }

acquireQueued()方法,自旋直到執行緒持有同步器,這裡執行緒將進入等待狀態,直到其它執行緒釋放資源,喚醒處於隊首的執行緒,喚醒後該執行緒必須要繼續獲取到資源

final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            //自旋,直到執行緒持有同步器,(tryAcquire()成功)
            for (;;) {
                final Node p = node.predecessor();
                //該節點的前驅是頭節點,說明喚醒順序先進先出的
                //嘗試獲取資源,成功則返回interrupted,讓該執行緒持有同步器
                if (p == head && tryAcquire(arg)) {
                    setHead(node);	//設定節點為頭節點
                    p.next = null; // 幫助釋放記憶體
                    return interrupted;
                }
                //檢查節點狀態,如果可以進入等待狀態,則讓執行緒進入等待狀態
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

shouldParkAfterFailedAcquire()方法,設定節點的狀態,要為node節點找到一個符合需求的前驅節點,並設定其狀態為SIGNAL,表明node節點是一個在等待狀態的正常的節點

private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        //前驅節點的waitStatus為SIGNAL,說明後繼節點可以進入等待狀態
        if (ws == Node.SIGNAL)          
            return true;
        if (ws > 0) {
        	//前驅節點是CANCELLED,則跳過這些節點
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
        	//前驅節點狀態正常的情況下,設定狀態為SIGNAL,標誌後繼節點可以進入等待狀態
            pred.compareAndSetWaitStatus(ws, Node.SIGNAL);
        }
        return false;
    }

parkAndCheckInterrupt()方法:藉助LockSupport將當前執行緒阻塞,在喚醒之後返回中斷狀態,因為在阻塞狀態下,執行緒不響應中斷,在喚醒之後,如果有需要則重新處理該中斷。

 private final boolean parkAndCheckInterrupt() {
	//LockSupport中的park() 和 unpark() 的作用分別是阻塞執行緒和解除阻塞執行緒
	//其它說明可百度
        LockSupport.park(this);
        return Thread.interrupted();
    }

4、AQS的release(int arg)方法:acquire()方法完成了資源獲取成功後持有同步器,獲取失敗後加入到同步佇列中,並將執行緒阻塞等一系列操作。下面分析同步佇列執行緒被喚醒的過程。 ①ReentrantLockunlock()方法只調用了release(1)方法:tryRelease()由子類實現,用來判斷是否可以喚醒執行緒。如果可以則呼叫unparkSuccessor()喚醒一個執行緒

 public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            //這裡傳入的是頭結點,也就是說喚醒的是頭結點的後繼節點
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

ReentrantLocktryRelease()方法:

protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            //state==0才能喚醒同步佇列中的執行緒
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            //重新設定state的值
            setState(c);
            return free;
        }

unparkSuccessor(Node node)方法:

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            node.compareAndSetWaitStatus(ws, 0);
	//找到需要被喚醒的節點
        Node s = node.next;
        //如果當前節點的後繼節點不符合要求(為null或者被取消了),則從尾節點向前查詢
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node p = tail; p != node && p != null; p = p.prev)
                if (p.waitStatus <= 0)
                    s = p;
        }
        //喚醒該執行緒,執行緒的阻塞是在acquireQueued()方法中,執行緒將從阻塞的地方繼續執行
        if (s != null)
            LockSupport.unpark(s.thread);
    }

三、通過CountDownLatch分析共享鎖:CountDownLatch在釋放同步器之後會一次喚醒所有等待在同步佇列上的執行緒。主要功能是:允許一個或者多個執行緒等待其他執行緒完成任務,然後這些等待的執行緒同時被喚醒,使用很簡單,下面主要分析共享鎖的實現,在ReentrantLock中分析過的函式不再分析。 1、CountDownLatch中的自定義同步器:

//CountDownLatch的建構函式,傳入了一個int值,作為state的初始值
public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        this.sync = new Sync(count);
    }
private static final class Sync extends AbstractQueuedSynchronizer {
        Sync(int count) {
	//設定state的初始值為自己傳入的引數
            setState(count);
        }

        int getCount() {
            return getState();
        }
	//只有state為 0的時候,也就是其它執行緒全部執行完成後,才算獲取資源成功,返回1
        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
        	//只有在state==1的時候才會返回true,進而釋放同步器,喚醒等待的執行緒
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c - 1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0;
            }
        }
    }

2、CountDownLatch中的await()方法:獲取資源,只調用了AQS的acquireSharedInterruptibly()方法,該方法會丟擲中斷異常,在呼叫await()時要捕獲異常。

 public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        //嘗試獲取資源,返回int型值,tryAcquire()返回bool值
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

3、doAcquireSharedInterruptibly()方法:

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        //將一個共享節點新增到同步佇列中
        final Node node = addWaiter(Node.SHARED);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    //嘗試獲取資源成功後(r>=0),setHeadAndPropagate()將head節點指向node,然後喚醒同步佇列中的共享節點
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        return;
                    }
                }
                //修改節點的狀態,阻塞執行緒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //如果等待過程中產生了中斷,則丟擲中斷異常
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

4、setHeadAndPropagate()方法:設定頭結點為node節點,並嘗試喚醒後續的共享節點

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head;