1. 程式人生 > >JAVA互斥鎖(synchronized&Lock):行為分析及原始碼

JAVA互斥鎖(synchronized&Lock):行為分析及原始碼

JVM中有這樣一段註釋:

// The base-class, PlatformEvent, is platform-specific while the ParkEvent is
// platform-independent.  PlatformEvent provides park(), unpark(), etc., and
// is abstract -- that is, a PlatformEvent should never be instantiated except
// as part of a ParkEvent.
// Equivalently we could have defined a platform-independent base-class that
// exported Allocate(), Release(), etc.  The platform-specific class would extend
// that base-class, adding park(), unpark(), etc.
//
// A word of caution: The JVM uses 2 very similar constructs:
// 1. ParkEvent are used for Java-level "monitor" synchronization.
// 2. Parkers are used by JSR166-JUC park-unpark.
//
// We'll want to eventually merge these redundant facilities and use ParkEvent.

其中是說ParkEvent用於Java語言級別的關鍵字synchronized。 Parkers用於Java類庫中的併發資料集合,該集合是由JSR166發展來的。 這裡說這兩個東西功能類似,將來會統一使用ParkEvent。 那麼它們究竟有什麼區別呢? 我們先看看這兩個類的大概介面樣子: (ParkEvent)

class ParkEvent : public os::PlatformEvent {
  private:
    ParkEvent * FreeNext ;

    // Current association
    Thread * AssociatedWith ;
    intptr_t RawThreadIdentity ;        // LWPID etc
    volatile int Incarnation ;

class PlatformEvent : public CHeapObj<mtInternal> {
    // Use caution with reset() and fired() -- they may require MEMBARs
    void reset() { _Event = 0 ; }
    int  fired() { return _Event; }
    void park () ;
    void unpark () ;
    int  TryPark () ;
    int  park (jlong millis) ; // relative timed-wait only
class Parker : public os::PlatformParker {
public:
  // For simplicity of interface with Java, all forms of park (indefinite,
  // relative, and absolute) are multiplexed into one call.
  void park(bool isAbsolute, jlong time);
  void unpark();

  // Lifecycle operators
  static Parker * Allocate (JavaThread * t) ;
  static void Release (Parker * e) ;
private:
  static Parker * volatile FreeList ;
  static volatile int ListLock ;

可以看到它們提供一致的相同介面,park和unpark。從而支撐Java中併發控制的功能。 它們究竟有什麼不同呢?我們首先來執行2段類似的程式碼。 阻塞執行緒獲取鎖的順序完全相反 首先是使用synchronized提供的鎖機制,我們隨便用一個Object lock = new Object()作為鎖關聯的物件,程式碼如下,它的功能是讓10個執行緒進入阻塞狀態,然後釋放鎖,觀察隨後執行緒獲取鎖的順序: (可執行程式碼)

 
package com.psly.testLocks;

public class TestLockSynchronized {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					synchronized(lock){
						System.out.println(Thread.currentThread().getName() + " get synch lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					}
				}
				
			});
		}
		synchronized(lock){
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		}
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}

我們用一個0.2seconds的時間,從而讓先建立的執行緒能夠先進入阻塞狀態,輸出為:

      
Thread-9 get synch lock!
Thread-8 get synch lock!
Thread-7 get synch lock!
Thread-6 get synch lock!
Thread-5 get synch lock!
Thread-4 get synch lock!
Thread-3 get synch lock!
Thread-2 get synch lock!
Thread-1 get synch lock!
Thread-0 get synch lock! 

這有點奇怪,先嚐試獲取鎖的執行緒竟然後獲得鎖! 先不管這個, 我們把這個例子改為JSR166的Lock重做一遍: (可執行程式碼)

     
package com.psly.testLocks;

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

	private static Lock lock = new ReentrantLock();
	public static void main(String[] args) throws InterruptedException {
		// TODO Auto-generated method stub
		int N = 10;
		Thread[] threads = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				public void run() {
					lock.lock();
						System.out.println(Thread.currentThread().getName() + " get JSR166 lock!");
						try {
							Thread.sleep(200);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
					lock.unlock();
				}
				
			});
		}
		lock.lock();
			for(int i = 0; i < N; ++i){
				threads[i].start();
				Thread.sleep(200);
			}
		lock.unlock();
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
	}
}

輸出為:

            
Thread-0 get JSR166 lock!
Thread-1 get JSR166 lock!
Thread-2 get JSR166 lock!
Thread-3 get JSR166 lock!
Thread-4 get JSR166 lock!
Thread-5 get JSR166 lock!
Thread-6 get JSR166 lock!
Thread-7 get JSR166 lock!
Thread-8 get JSR166 lock!
Thread-9 get JSR166 lock!

這個輸出比較符合了我們的預期,畢竟先嚐試獲取鎖的的確先獲取了鎖。 為什麼這兩種實現有這樣的差異呢,我們來看下他們分別的阻塞佇列實現,首先是JAVA的:

     
    public void lock() {
        sync.lock();
    }

        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
	

我們這裡重點檢視addWaiter(Node node);可以看出來,執行緒構造的阻塞節點是通過tail欄位加入進佇列的,並且作為next節點。這是個先進先出雙向佇列。 所以當鎖被釋放時,阻塞執行緒獲取鎖的順序與進阻塞佇列是一致的。 我們接著看下synchronized的實現,這裡涉及到JVM中系統程式設計的原始碼,這裡也只貼出跟進入阻塞佇列相關的程式碼:

                   
class ObjectMonitor;

class ObjectSynchronizer : AllStatic {
  static void fast_enter  (Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS);
  static void slow_enter  (Handle obj, BasicLock* lock, TRAPS);
void ObjectSynchronizer::fast_enter(Handle obj, BasicLock* lock, bool attempt_rebias, TRAPS) {
 if (UseBiasedLocking) {
    if (!SafepointSynchronize::is_at_safepoint()) {
      BiasedLocking::Condition cond = BiasedLocking::revoke_and_rebias(obj, attempt_rebias, THREAD);
      if (cond == BiasedLocking::BIAS_REVOKED_AND_REBIASED) {
        return;
      }
    } else {
      assert(!attempt_rebias, "can not rebias toward VM thread");
      BiasedLocking::revoke_at_safepoint(obj);
    }
    assert(!obj->mark()->has_bias_pattern(), "biases should be revoked by now");
 }

 slow_enter (obj, lock, THREAD) ;
}
void ObjectSynchronizer::slow_enter(Handle obj, BasicLock* lock, TRAPS) {
  markOop mark = obj->mark();
  assert(!mark->has_bias_pattern(), "should not see bias pattern here");
  ................
  ObjectSynchronizer::inflate(THREAD, obj())->enter(THREAD);
}
void ATTR ObjectMonitor::enter(TRAPS) {
  // The following code is ordered to check the most common cases first
  // and to reduce RTS->RTO cache line upgrades on SPARC and IA32 processors.
  Thread * const Self = THREAD ;
  void * cur ;
  ·········
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;

      if (!ExitSuspendEquivalent(jt)) break ;

      //
      // We have acquired the contended monitor, but while we were
      // waiting another thread suspended us. We don't want to enter
      // the monitor while suspended because that would surprise the
      // thread that suspended us.
      //
          _recursions = 0 ;
     ······
}

void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    // Try the lock - TATAS
    if (TryLock (Self) > 0) {
			......
    }

    if (TrySpin (Self) > 0) {
       ......
    }

    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    // Push "Self" onto the front of the _cxq.
    // Once on cxq/EntryList, Self stays on-queue until it acquires the lock.
    // Note that spinning tends to reduce the rate at which threads
    // enqueue and dequeue on EntryList|cxq.
    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
	

這裡的重點是,ObjectWaiter node(Self)

                     
ObjectWaiter node(Self) ;
........
 for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }
	

_cxq,我們採用比較並交換的原子指令,修改了_cxq,修改之前將_cxq的舊值填入node的next欄位,這樣一來我們就在_cxq上構造了個stack,也就是先進後出的佇列。於是下次當我們索取_cxq時候自然就取得了最後填入的值。這解釋了我們上面的執行示例,阻塞執行緒獲取鎖的順序與進佇列完全相反。 我們接著看下再複雜點的例子,依次啟動10個執行緒,依次獲取鎖,獲得鎖的同時列印自身資訊,然後主動呼叫wait語義的方法陷入阻塞狀態。等到這10個執行緒都阻塞之後主執行緒獲取鎖,接著再啟動10個無等待執行緒,這是個執行緒唯一做的事情就是依次獲取鎖,他們會按照我們上面所說的方式進入阻塞佇列。接著主執行緒依次傳送4次notify語義的訊號(注意時間間隔),然後釋放鎖。我們感興趣的是這幾個收到通知的執行緒,他們相對已經在阻塞佇列中的執行緒,誰會先獲取鎖?他們的排列又是怎麼樣的呢? 我們先執行JSR166的版本,程式碼如下: (可執行程式碼)

                
package com.psly.testLocks;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class TestLockSynchronized {

	private static Lock lock = new ReentrantLock();
	private static Condition condition = lock.newCondition();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					lock.lock();		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					lock.lock();		//synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						condition.await();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					lock.unlock();
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		lock.lock();	//synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
				condition.signal();
		}
		Thread.sleep(200);
		lock.unlock();
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}

Thread-10到Thread-19為主動呼叫wait阻塞的執行緒,Thread-0到Thread-9為只獲取鎖的執行緒。 輸出為:

           
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-0 nowait get lock
Thread-1 nowait get lock
Thread-2 nowait get lock
Thread-3 nowait get lock
Thread-4 nowait get lock
Thread-5 nowait get lock
Thread-6 nowait get lock
Thread-7 nowait get lock
Thread-8 nowait get lock
Thread-9 nowait get lock
Thread-10 wait second get lock
Thread-11 wait second get lock
Thread-12 wait second get lock
Thread-13 wait second get 

可以看到JSR166的實現依然滿足先進先出,即使Thread-10到Thread-13是先獲取鎖之後陷入wait的。我們接著看下這是如何做到的, 注意JSR166的實現是在JAVA層面完成的。 主要是三個呼叫:wait,notify,unlock。 await:

           
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }
		

這裡的addConditionWaiter嘗試新增等待佇列的節點。acquireQueued用於將來被喚醒之後的再次嘗試獲取鎖。 我們來看addConditionWaiter,

                            

        /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

是將新節點作為lastWaiter的next節點,並且本身成為lastWaiter節點。那麼這裡說明這構造的是一個先進先出的佇列。(這裡是在已經獲取鎖的情況下,所以不需同步) 我們接著看 signal

                            
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

    final boolean transferForSignal(Node node) {
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }  
	

以上已經拿到了等待佇列第一個節點,接著enq讓他轉移(transfer)到鎖的阻塞佇列

                            
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
	

這樣一來我們就完成了將等待執行緒從處於wait的狀態,轉移到了未獲得鎖處於阻塞的狀態。 最後看下當主執行緒釋放鎖時的操作: unlock

     
    public void unlock() {
        sync.release(1);
    }
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }
	

可以看到,呼叫unlock的執行緒喚醒了阻塞佇列head中的第一個執行緒。 (OVER) 因為阻塞佇列跟等待佇列都是先進先出,這樣子能夠得到一個比較好的行為。 從而導致了我們之前的輸出,看上去比較符合預期。 最後我們來看看採用synchronized,這個示例下的輸出是什麼,程式碼如下: (可執行程式碼)

     
package com.psly.testLocks;

public class TestLockSynchronized {

	private static Object lock = new Object();
	public static void main(String[] args) throws InterruptedException {
		int N = 10;
		Thread[] threads = new Thread[N];
		Thread[] threadsForWaits = new Thread[N];
		for(int i = 0; i < N; ++i){
			threads[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					synchronized(lock){		
					System.out.println(Thread.currentThread().getName() + " nowait get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		for(int i = 0; i < N; ++i){
			threadsForWaits[i] = new Thread(new Runnable(){
				@Override
				public void run() {
					// TODO Auto-generated method stub
					synchronized(lock){
					System.out.println(Thread.currentThread().getName() + " wait first get lock");
					try {
						lock.wait();
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					
					System.out.println(Thread.currentThread().getName() + " wait second get lock");
					try {
						Thread.sleep(200);
					} catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					}
				}
				
			});
		}
		
		for(int i = 0; i < N; ++i){
			threadsForWaits[i].start();
			Thread.sleep(200);
		}
		synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		for(int i = 0; i < 4 ; ++i){
			lock.notify();
		}
		Thread.sleep(200);
		}
			
			
		for(int i = 0; i < N; ++i)
			threads[i].join();
		
		for(int i = 0; i < N; ++i)
			threadsForWaits[i].join();
		
	}

}

輸出入下:

                            

Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock
                            

這個結果很奇怪,最奇怪在於居然是連續輸出Thread-10,Thread-13,Thread-12,Thread-11:
     
Thread-10 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock

我們調整一下發送notify的數量,給出所有等待執行緒數量的呼叫N。

  
		for(int i = 0; i < N ; ++i){
			lock.notify();
		}
		

輸出為:

  
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-10 wait second get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock

依然是Thread-10莫名其妙出現在最前,後面緊接著Thread-19到Thread-11倒序。 我們再嘗試換下呼叫方式,採用notifyAll();

  
	synchronized(lock){
		for(int i = 0; i < N; ++i){
			threads[i].start();
				Thread.sleep(200);
		}
		lock.notifyAll();
		Thread.sleep(200);
		}
		

輸出為:

  
Thread-10 wait first get lock
Thread-11 wait first get lock
Thread-12 wait first get lock
Thread-13 wait first get lock
Thread-14 wait first get lock
Thread-15 wait first get lock
Thread-16 wait first get lock
Thread-17 wait first get lock
Thread-18 wait first get lock
Thread-19 wait first get lock
Thread-19 wait second get lock
Thread-18 wait second get lock
Thread-17 wait second get lock
Thread-16 wait second get lock
Thread-15 wait second get lock
Thread-14 wait second get lock
Thread-13 wait second get lock
Thread-12 wait second get lock
Thread-11 wait second get lock
Thread-10 wait second get lock
Thread-9 nowait get lock
Thread-8 nowait get lock
Thread-7 nowait get lock
Thread-6 nowait get lock
Thread-5 nowait get lock
Thread-4 nowait get lock
Thread-3 nowait get lock
Thread-2 nowait get lock
Thread-1 nowait get lock
Thread-0 nowait get lock

這下子又變了,Thread-10變為最後,完全逆序來獲取鎖了。 我們嘗試進入JVM去看下這一切是怎麼回事。與之前的過程類似,首先我們來看看等待之後執行緒節點如何組織的: 經研究,應該是下面這片程式碼: WAIT:

 
void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
   Thread * const Self = THREAD ;


   TEVENT (Wait) ;

   assert (Self->_Stalled == 0, "invariant") ;
   Self->_Stalled = intptr_t(this) ;
   jt->set_current_waiting_monitor(this);

   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;
   Self->_ParkEvent->reset() ;
   OrderAccess::fence();          // ST into Event; membar ; LD interrupted-flag

   // Enter the waiting queue, which is a circular doubly linked list in this case
   // but it could be a priority queue or any data structure.
   // _WaitSetLock protects the wait queue.  Normally the wait queue is accessed only
   // by the the owner of the monitor *except* in the case where park()
   // returns because of a timeout of interrupt.  Contention is exceptionally rare
   // so we use a simple spin-lock instead of a heavier-weight blocking lock.

   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
inline void ObjectMonitor::AddWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev == NULL, "node already in list");
  assert(node->_next == NULL, "node already in list");
  // put node at end of queue (circular doubly linked list)
  if (_WaitSet == NULL) {
    _WaitSet = node;
    node->_prev = node;
    node->_next = node;
  } else {
    ObjectWaiter* head = _WaitSet ;
    ObjectWaiter* tail = head->_prev;
    assert(tail->_next == head, "invariant check");
    tail->_next = node;
    head->_prev = node;
    node->_next = head;
    node->_prev = tail;
  }
}

如果_WaitSet為空,則設定它,並且前驅和後繼都是它。 如果只有_WaitSet一個,則將節點增加到它的後面。 不為空的情況下統一新增到next節點。 所以這裡是個先進先出的佇列。 notify

void ObjectMonitor::notify(TRAPS) {
  CHECK_OWNER();
  if (_WaitSet == NULL) {
     TEVENT (Empty-Notify) ;
     return ;
  }
  DTRACE_MONITOR_PROBE(notify, this, object(), THREAD);

  int Policy = Knob_MoveNotifyee ;

  Thread::SpinAcquire (&_WaitSetLock, "WaitSet - notify") ;
  ObjectWaiter * iterator = DequeueWaiter() ;
inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}

inline ObjectWaiter* ObjectMonitor::DequeueWaiter() {
  // dequeue the very first waiter
  ObjectWaiter* waiter = _WaitSet;
  if (waiter) {
    DequeueSpecificWaiter(waiter);
  }
  return waiter;
}
inline void ObjectMonitor::DequeueSpecificWaiter(ObjectWaiter* node) {
  assert(node != NULL, "should not dequeue NULL node");
  assert(node->_prev != NULL, "node already removed from list");
  assert(node->_next != NULL, "node already removed from list");
  // when the waiter has woken up because of interrupt,
  // timeout or other spurious wake-up, dequeue the
  // waiter from waiting list
  ObjectWaiter* next = node->_next;
  if (next == node) {
    assert(node->_prev == node, "invariant check");
    _WaitSet = NULL;
  } else {
    ObjectWaiter* prev = node->_prev;
    assert(prev->_next == node, "invariant check");
    assert(next->_prev == node, "invariant check");
    next->_prev = prev;
    prev->_next = next;
    if (_WaitSet == node) {
      _WaitSet = next;
    }
  }
  node->_next = NULL;
  node->_prev = NULL;
}
static int Knob_MoveNotifyee       = 2 ;       // notify() - disposition of notifyee
     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         if (List == NULL) {
             iterator->_next = iterator->_prev = NULL ;
             _EntryList = iterator ;
         } else {
            iterator->TState = ObjectWaiter::TS_CXQ ;
            for (;;) {
                ObjectWaiter * Front = _cxq ;
                iterator->_next = Front ;
                if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                    break ;
                }
            }
         }
     } else
     if (Policy == 3) {      // append to cxq
        iterator->TState = ObjectWaiter::TS_CXQ ;
        for (;;) {
            ObjectWaiter * Tail ;
            Tail = _cxq ;
            if (Tail == NULL) {
			
			

這裡的DequeueWaiter呼叫DequeueSpecificWaiter,效果是隊列出一個元素,_WaitSet.next成為_WaitSet。這裡有_EntryList、_cxq兩個資料結構。 接著我們走Policy==2分支,注意這裡並不是全部放入cxq(儘管註釋如此),判斷是_EntryList==NULL的時候,直接將我們的節點放入它。否則,將我們的節點新增到_cxq這個stack前面。想象一個,假如第一個節點進來,發現_EntryList為空,_EntryList設定為它自己。從第二個節點開始,所有節點都是進stack,這樣的話是不是取出時,第二個往後的節點都顛倒了呢。假如我們取節點的方式是先驅_EntryList,然後再取stack中的元素。則就會發生示例中Thread-10提前的亂序情況。 但是注意,之前的notifyAll並沒有產生這種效果。所以我們來看下notifyAll的程式碼:

     if (Policy == 2) {      // prepend to cxq
         // prepend to cxq
         iterator->TState = ObjectWaiter::TS_CXQ ;
         for (;;) {
             ObjectWaiter * Front = _cxq ;
             iterator->_next = Front ;
             if (Atomic::cmpxchg_ptr (iterator, &_cxq, Front) == Front) {
                 break ;
             }
         }
     } else
	 

果然如此!notifyAll的邏輯跟notify大部分一樣,除了它將所有節點都加入cxq。所以我們才會觀察到notifyAll呼叫之後的節點獲取鎖順序是逆序。 unlock 我們接著看看unlock的時候,是不是如我們猜測的那樣先取_EntryList的元素,再來看cxq。

void ObjectSynchronizer::slow_exit(oop object, BasicLock* lock, TRAPS) {
  fast_exit (object, lock, THREAD) ;
}
void ObjectSynchronizer::fast_exit(oop object, BasicLock* lock, TRAPS) {
  assert(!object->mark()->has_bias_pattern(), "should not see bias pattern here");
  // if displaced header is null, the previous enter is recursive enter, no-op
  .........

  ObjectSynchronizer::inflate(THREAD, object)->exit (true, THREAD) ;
}
void ATTR ObjectMonitor::exit(bool not_suspended, TRAPS) {
       ...................
       for (;;) {
      assert (THREAD == _owner, "invariant") ;


      if (Knob_ExitPolicy == 0) {

        .........
      } else {
			..........
         } else {
            TEVENT (Inflated exit - complex egress) ;
         }
      }

      guarantee (_owner == THREAD, "invariant") ;

      ObjectWaiter * w = NULL ;
      int QMode = Knob_QMode ;

  

      w = _EntryList  ;
      if (w != NULL) {

          assert (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }

      w = _cxq ;
      if (w == NULL) continue ;

      for (;;) {
          assert (w != NULL, "Invariant") ;
          ObjectWaiter * u = (ObjectWaiter *) Atomic::cmpxchg_ptr (NULL, &_cxq, w) ;
          if (u == w) break ;
          w = u ;
      }
 

      if (QMode == 1) {
               ..............
      } else {
         _EntryList = w ;
         ObjectWaiter * q = NULL ;
         ObjectWaiter * p ;
         for (p = w ; p != NULL ; p = p->_next) {
             guarantee (p->TState == ObjectWaiter::TS_CXQ, "Invariant") ;
             p->TState = ObjectWaiter::TS_ENTER ;
             p->_prev = q ;
             q = p ;
         }
      }

      if (_succ != NULL) continue;

      w = _EntryList  ;
      if (w != NULL) {
          guarantee (w->TState == ObjectWaiter::TS_ENTER, "invariant") ;
          ExitEpilog (Self, w) ;
          return ;
      }
   }
static int Knob_QMode              = 0 ;       // EntryList-cxq policy - queue discipline
static int Knob_ExitPolicy         = 0 ;

這裡是先取_EntryList,假如有就呼叫ExitEpilog並返回,否則採用原子操作取_cxq,然後將這個值再次給_EntryList,並呼叫ExitEpilog。 總之這裡最終都是將資料給_EntryList,只不過假如_EntryList原本就有值,那麼我們會先使用它,之後再使用_cxq。 我們看下ExitEpilog完成了什麼事:

void ObjectMonitor::ExitEpilog (Thread * Self, ObjectWaiter * Wakee) {
   assert (_owner == Self, "invariant") ;

   // Exit protocol:
   // 1. ST _succ = wakee
   // 2. membar #loadstore|#storestore;
   // 2. ST _owner = NULL
   // 3. unpark(wakee)

   _succ = Knob_SuccEnabled ? Wakee->_thread : NULL ;
   ParkEvent * Trigger = Wakee->_event ;

   Wakee  = NULL ;

   // Drop the lock
   OrderAccess::release_store_ptr (&_owner, NULL) ;
   OrderAccess::fence() ;                               // ST _owner vs LD in unpark()

   if (SafepointSynchronize::do_call_back()) {
      TEVENT (unpark before SAFEPOINT) ;
   }

   Trigger->unpark() ;

   // Maintain stats and report events to JVMTI
   if (ObjectMonitor::_sync_Parks != NULL) {
      ObjectMonitor::_sync_Parks->inc() ;
   }
}

果然這裡最後呼叫了unpark,從而喚醒了相應的那個執行緒。這裡的_EntryList的值會如何變化?我們最後看下,當等待執行緒從wait中醒過來會做什麼: // Note: a subset of changes to ObjectMonitor::wait() // will need to be replicated in complete_exit above

void ObjectMonitor::wait(jlong millis, bool interruptible, TRAPS) {
..........
   ObjectWaiter node(Self);
   node.TState = ObjectWaiter::TS_WAIT ;

.............
   Thread::SpinAcquire (&_WaitSetLock, "WaitSet - add") ;
   AddWaiter (&node) ;
   Thread::SpinRelease (&_WaitSetLock) ;
............
   if (node.TState == ObjectWaiter::TS_WAIT) {
         Thread::SpinAcquire (&_WaitSetLock, "WaitSet - unlink") ;
         if (node.TState == ObjectWaiter::TS_WAIT) {
            DequeueSpecificWaiter (&node) ;       // unlink from WaitSet
            assert(node._notified == 0, "invariant");
            node.TState = ObjectWaiter::TS_RUN ;
         }
..............
     assert (_owner != Self, "invariant") ;
     ObjectWaiter::TStates v = node.TState ;
     if (v == ObjectWaiter::TS_RUN) {
         enter (Self) ;
     } else {
	 

進入了enter(Self),

void ATTR ObjectMonitor::enter(TRAPS) {
    for (;;) {
      jt->set_suspend_equivalent();
      // cleared by handle_special_suspend_equivalent_condition()
      // or java_suspend_self()

      EnterI (THREAD) ;

進入EnterI(THREAD),
void ATTR ObjectMonitor::EnterI (TRAPS) {
    Thread * Self = THREAD ;
    assert (Self->is_Java_thread(), "invariant") ;
    assert (((JavaThread *) Self)->thread_state() == _thread_blocked   , "invariant") ;

    if (TryLock (Self) > 0) {
        assert (_succ != Self              , "invariant") ;
        assert (_owner == Self             , "invariant") ;
        assert (_Responsible != Self       , "invariant") ;
        return ;
    }

    DeferredInitialize () ;


    if (TrySpin (Self) > 0) {
        assert (_owner == Self        , "invariant") ;
        assert (_succ != Self         , "invariant") ;
        assert (_Responsible != Self  , "invariant") ;
        return ;
    }

    // The Spin failed -- Enqueue and park the thread ...
    assert (_succ  != Self            , "invariant") ;
    assert (_owner != Self            , "invariant") ;
    assert (_Responsible != Self      , "invariant") ;


    ObjectWaiter node(Self) ;
    Self->_ParkEvent->reset() ;
    node._prev   = (ObjectWaiter *) 0xBAD ;
    node.TState  = ObjectWaiter::TS_CXQ ;

    ObjectWaiter * nxt ;
    for (;;) {
        node._next = nxt = _cxq ;
        if (Atomic::cmpxchg_ptr (&node, &_cxq, nxt) == nxt) break ;

        // Interference - the CAS failed because _cxq changed.  Just retry.
        // As an optional optimization we retry the lock.
        if (TryLock (Self) > 0) {
            assert (_succ != Self         , "invariant") ;
            assert (_owner == Self        , "invariant") ;
            assert (_Responsible != Self  , "invariant") ;
            return ;
        }
    }

    if ((SyncFlags & 16) == 0 && nxt == NULL && _EntryList == NULL) {
        // Try to assume the role of responsible thread for the monitor.
        // CONSIDER:  ST vs CAS vs { if (Responsible==null) Responsible=Self }
        Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
    }

    TEVENT (Inflated enter - Contention) ;
    int nWakeups = 0 ;
    int RecheckInterval = 1 ;

    for (;;) {

        if (TryLock (Self) > 0) break ;
        assert (_owner != Self, "invariant") ;

        if ((SyncFlags & 2) && _Responsible == NULL) {
           Atomic::cmpxchg_ptr (Self, &_Responsible, NULL) ;
        }

        if (_Responsible == Self || (SyncFlags & 1)) {
            TEVENT (Inflated enter - park TIMED) ;
            Self->_ParkEvent->park ((jlong) RecheckInterval) ;
            RecheckInterval *= 8 ;
            if (RecheckInterval > 1000) RecheckInterval = 1000 ;
        } else {
            TEVENT (Inflated enter - park UNTIMED) ;
            Self->_ParkEvent->park() ;
        }

        if (TryLock(Self) > 0) break ;

        TEVENT (Inflated enter - Futile wakeup) ;
        if (ObjectMonitor::_sync_FutileWakeups != NULL) {
           ObjectMonitor::_sync_FutileWakeups->inc() ;
        }
        ++ nWakeups ;

        if ((Knob_ResetEvent & 1) && Self->_ParkEvent->fired()) {
           Self->_ParkEvent->reset() ;
           OrderAccess::fence() ;
        }
        if (_succ == Self) _succ = NULL ;

        // Invariant: after clearing _succ a thread *must* retry _owner before parking.
        OrderAccess::fence() ;
    }

    assert (_owner == Self      , "invariant") ;
    assert (object() != NULL    , "invariant") ;
    // I'd like to write:
    //   guarantee (((oop)(object()))->mark() == markOopDesc::encode(this), "invariant") ;
    // but as we're at a safepoint that's not safe.

    UnlinkAfterAcquire (Self, &node) ;
    if (_succ == Self) _succ = NULL ;

    assert (_succ != Self, "invariant") ;
    if (_Responsible == Self) {
        _Responsible = NULL ;
        OrderAccess::fence(); // Dekker pivot-point
	.........
	

這裡的重點是UnlinkAfterAcquire,

void ObjectMonitor::UnlinkAfterAcquire (Thread * Self, ObjectWaiter * SelfNode)
{
    assert (_owner == Self, "invariant") ;
    assert (SelfNode->_thread == Self, "invariant") ;

    if (SelfNode->TState == ObjectWaiter::TS_ENTER) {
        // Normal case: remove Self from the DLL EntryList .
        // This is a constant-time operation.
        ObjectWaiter * nxt = SelfNode->_next ;
        ObjectWaiter * prv = SelfNode->_prev ;
        if (nxt != NULL) nxt->_prev = prv ;
        if (prv != NULL) prv->_next = nxt ;
        if (SelfNode == _EntryList ) _EntryList = nxt ;
        assert (nxt == NULL || nxt->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        assert (prv == NULL || prv->TState == ObjectWaiter::TS_ENTER, "invariant") ;
        TEVENT (Unlink from EntryList) ;
    } else {
	

它會將_EntryList的值做更新,從而讓鎖的獲取繼續下去,保證不會出錯。 到這裡為止,我們終於大致走完了一遍synchronized鎖與lock鎖分別在JVM和JUC中的實現。 那麼有個問題,linux中pthread鎖的實現,行為模式又是怎麼樣的呢? 我們嘗試將使用pthread來執行測試這兩個例子: 鎖獲取程式碼: 編譯生成執行檔案testLock:gcc -pthread testLock.c -o testLock 執行:./testLock

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

int main(){
//  int N = 10;
    pthread_t threads[N];
    int i = 0;
    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    return 0;

}

輸出:

0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock

鎖獲取+阻塞等待程式碼:

#include <stdio.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
#define N 10
void* runTask(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d get lock\n", (int)pm);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    return 0;
}

void* runTaskWithWait(void* pm){
    pthread_mutex_lock(&mutex);
    printf("%d wait first get lock\n", (int)pm);
    pthread_cond_wait(&cond, &mutex);
    printf("%d wait second get lock\n", (int)pm);
    usleep(300000);
    pthread_mutex_unlock(&mutex);
}
int main(){
//  int N = 10;
    pthread_t threads[N];
    pthread_t threadsForWaits[N];
    int i = 0;
    for(; i < N; ++i){
        pthread_create(&threadsForWaits[i], 0, runTaskWithWait, (void*)i);
        usleep(100000);
    }

    pthread_mutex_lock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_create(&threads[i], 0, runTask, (void*)i);
        usleep(100000);
    }
    //pthread_cond_broadcast(&cond);
    for(i = 0; i < N; ++i)
        pthread_cond_signal(&cond);
    usleep(100000);
    pthread_mutex_unlock(&mutex);
    for(i = 0; i < N; ++i){
        pthread_join(threads[i], NULL);
    }
    for(i = 0; i < N; ++i){
        pthread_join(threadsForWaits[i], NULL);
    }
    return 0;

} 

輸出:

0 wait first get lock
1 wait first get lock
2 wait first get lock
3 wait first get lock
4 wait first get lock
5 wait first get lock
6 wait first get lock
7 wait first get lock
8 wait first get lock
9 wait first get lock
0 get lock
1 get lock
2 get lock
3 get lock
4 get lock
5 get lock
6 get lock
7 get lock
8 get lock
9 get lock
1 wait second get lock
2 wait second get lock
3 wait second get lock
0 wait second get lock
4 wait second get lock
5 wait second get lock
6 wait second get lock
7 wait second get lock
8 wait second get lock
9 wait second get lock

只能說等待執行緒轉移到阻塞執行緒之後的排列,看起來是沒啥規律 ([email protected][email protected]=)