1. 程式人生 > >共享鎖(S鎖)和排它鎖(X鎖)

共享鎖(S鎖)和排它鎖(X鎖)

threads latch rgs appears iou out lse 區別 private

共享鎖【S鎖】
又稱讀鎖,若事務T對數據對象A加上S鎖,則事務T可以讀A但不能修改A,其他事務只能再對A加S鎖,而不能加X鎖,直到T釋放A上的S鎖。這保證了其他事務可以讀A,但在T釋放A上的S鎖之前不能對A做任何修改。

排他鎖【X鎖】
又稱寫鎖。若事務T對數據對象A加上X鎖,事務T可以讀A也可以修改A,其他事務不能再對A加任何鎖,直到T釋放A上的鎖。這保證了其他事務在T釋放A上的鎖之前不能再讀取和修改A。

1、什麽是共享鎖和排它鎖

共享鎖就是允許多個線程同時獲取一個鎖,一個鎖可以同時被多個線程擁有。 排它鎖,也稱作獨占鎖,一個鎖在某一時刻只能被一個線程占有,其它線程必須等待鎖被釋放之後才可能獲取到鎖。 2、排它鎖和共享鎖實例 ReentrantLock就是一種排它鎖。CountDownLatch是一種共享鎖。這兩類都是單純的一類,即,要麽是排它鎖,要麽是共享鎖。 ReentrantReadWriteLock是同時包含排它鎖和共享鎖特性的一種鎖,這裏主要以ReentrantReadWriteLock為例來進行分析學習。我們使用ReentrantReadWriteLock的寫鎖時,使用的便是排它鎖的特性;使用ReentrantReadWriteLock的讀鎖時,使用的便是共享鎖的特性。 3、鎖的等待隊列組成 ReentrantReadWriteLock有一個讀鎖(ReadLock)和一個寫鎖(WriteLock)屬性,分別代表可重入讀寫鎖的讀鎖和寫鎖。有一個Sync屬性來表示這個鎖上的等待隊列。ReadLock和WriteLock各自也分別有一個Sync屬性表示在這個鎖上的隊列 通過構造函數來看, public ReentrantReadWriteLock(boolean fair) { sync = (fair)? new FairSync() : new NonfairSync(); readerLock = new ReadLock(this); writerLock = new WriteLock(this); } 在創建讀鎖和寫鎖對象的時候,會把這個可重入的讀寫鎖上的Sync屬性傳遞過去。 protected ReadLock(ReentrantReadWriteLock lock) { sync = lock.sync; } protected WriteLock(ReentrantReadWriteLock lock) { sync = lock.sync; } 所以,最終的效果是讀鎖和寫鎖使用的是同一個線程等待隊列。這個隊列就是通過我們在前面介紹過的AbstractQueuedSynchronizer實現的。 4、鎖的狀態 既然讀鎖和寫鎖使用的是同一個等待隊列,那麽這裏要如何區分一個鎖的讀狀態(有多少個線程正在讀這個鎖)和寫狀態(是否被加了寫鎖,哪個線程正在寫這個鎖)。 首先每個鎖都有一個exclusiveOwnerThread屬性,這是繼承自AbstractQueuedSynchronizer,來表示當前擁有這個鎖的線程。那麽,剩下的主要問題就是確定,有多少個線程正在讀這個鎖,以及是否加了寫鎖。 這裏可以通過線程獲取鎖時執行的邏輯來看,下面是線程獲取讀鎖時會執行的一部分代碼。 final boolean tryReadLock() { Thread current = Thread.currentThread(); for (;;) { int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return false ; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded" ); if (compareAndSetState(c, c + SHARED_UNIT)) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); rh.count++; return true ; } } } 註意這個函數的調用exclusiveCount(c) ,用來計算這個鎖當前的寫加鎖次數(同一個進程多次進入會累加)。代碼如下 /** Returns the number of shared holds represented in count */ static int sharedCount( int c) { return c >>> SHARED_SHIFT; } /** Returns the number of exclusive holds represented in count */ static int exclusiveCount (int c) { return c & EXCLUSIVE_MASK; } 相關常量的定義如下 static final int SHARED_SHIFT = 16; static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1; 如果從二進制來看EXCLUSIVE_MASK的表示,這個值的低16位全是1,而高16位則全是0,所以exclusiveCount是把state的低16位取出來,表示當前這個鎖的寫鎖加鎖次數。 再來看sharedCount,取出了state的高16位,用來表示這個鎖的讀鎖加鎖次數。所以,這裏是用state的高16位和低16位來分別表示這個鎖上的讀鎖和寫鎖的加鎖次數。 現在再回頭來看tryReadLock實現,首先檢查這個鎖上是否被加了寫鎖,同時檢查加寫鎖的是不是當前線程。如果不是被當前線程加了寫鎖,那麽試圖加讀鎖就失敗了。如果沒有被加寫鎖,或者是被當前線程加了寫鎖,那麽就把讀鎖加鎖次數加1,通過compareAndSetState(c, c + SHARED_UNIT)來實現 SHARED_UNIT的定義如下,剛好實現了高16位的加1操作。 static final int SHARED_UNIT = (1 << SHARED_SHIFT); 5、線程阻塞和喚醒的時機 線程的阻塞和訪問其他鎖的時機相似,在線程視圖獲取鎖,但這個鎖又被其它線程占領無法獲取成功時,線程就會進入這個鎖對象的等待隊列中,並且線程被阻塞,等待前面線程釋放鎖時被喚醒。 但因為加讀鎖和加寫鎖進入等待隊列時存在一定的區別,加讀鎖時,final Node node = addWaiter(Node.SHARED);節點的nextWaiter指向一個共享節點,表明當前這個線程是處於共享狀態進入等待隊列。 加寫鎖時如下, public final void acquire (int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 線程是處於排它狀態進入等待隊列的。 在線程的阻塞上,讀鎖和寫鎖的時機相似,但在線程的喚醒上,讀鎖和寫鎖則存在較大的差別。 讀鎖通過AbstractQueuedSynchronizer的doAcquireShared來完成獲取鎖的動作。 private void doAcquireShared( int arg) { final Node node = addWaiter(Node.SHARED); try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null ; // help GC if (interrupted) selfInterrupt(); return ; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true ; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } } 在tryAcquireShared獲取讀鎖成功後(返回正數表示獲取成功),有一個setHeadAndPropagate的函數調用。 寫鎖通過AbstractQueuedSynchronizer的acquire來實現鎖的獲取動作。 public final void acquire( int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } 如果tryAcquire獲取成功則直接返回,否則把線程加入到鎖的等待隊列中。和一般意義上的ReentrantLock的原理一樣。 所以在加鎖上,主要的差別在於這個setHeadAndPropagate方法,其代碼如下 private void setHeadAndPropagate (Node node, int propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don‘t know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } } 主要操作是把這個節點設為頭節點(成為頭節點,則表示不在等待隊列中,因為獲取鎖成功了),同時釋放鎖(doReleaseShared)。 下面來看doReleaseShared的實現 private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue ; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue ; // loop on failed CAS } if (h == head) // loop if head changed break ; } } 把頭節點的waitStatus這只為0或者Node.PROPAGATE,並且喚醒下一個線程,然後就結束了。 總結一下,就是一個線程在獲取讀鎖後,會喚醒鎖的等待隊列中的第一個線程。如果這個被喚醒的線程是在獲取讀鎖時被阻塞的,那麽被喚醒後,就會在for循環中,又執行到setHeadAndPropagate,這樣就實現了讀鎖獲取時的傳遞喚醒。這種傳遞在遇到一個因為獲取寫鎖被阻塞的線程節點時被終止。 下面通過代碼來理解這種等待和線程喚醒順序。 package lynn.lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class TestThread extends Thread { private ReentrantReadWriteLock lock; private String threadName; private boolean isWriter ; public TestThread(ReentrantReadWriteLock lock, String name, boolean isWriter) { this.lock = lock; this.threadName = name; this.isWriter = isWriter; } @Override public void run() { while (true ) { try { if (isWriter ) { lock.writeLock().lock(); } else { lock.readLock().lock(); } if (isWriter ) { Thread. sleep
(3000); System. out.println("----------------------------" ); } System. out.println(System.currentTimeMillis() + ":" + threadName ); if (isWriter ) { Thread. sleep(3000); System. out.println("-----------------------------" ); } } catch (Exception e) { e.printStackTrace(); } finally { if (isWriter ) { lock.writeLock().unlock(); } else { lock.readLock().unlock(); } } break; } } } TestThread是一個自定義的線程類,在生成線程的時候,需要傳遞一個可重入的讀寫鎖對象進去,線程在執行時會先加鎖,然後進行內容輸出,然後釋放鎖。如果傳遞的是寫鎖,那線程在輸出結果前後會先沈睡3秒,便於區分輸出的結果時間。 package lynn.lock; import java.util.concurrent.locks.ReentrantReadWriteLock; public class Main { public static void blockByWriteLock() { ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); lock.writeLock().lock(); TestThread[] threads = new TestThread[10]; for (int i = 0; i < 10; i++) { boolean isWriter = (i + 1) % 4 == 0 ? true : false; TestThread thread = new TestThread(lock, "thread-" + (i + 1), isWriter); threads[i] = thread; } for (int i = 0; i < threads.length; i++) { threads[i].start(); } System. out
.println(System.currentTimeMillis() + ": block by write lock"); try { Thread. sleep(3000); } catch (Exception e) { e.printStackTrace(); } lock.writeLock().unlock(); } public static void main(String[] args) { blockByWriteLock
(); } } 在Main中構造了10個線程,由於這個鎖一開始是被主線程擁有,並且是在排它狀態下加鎖的,所以我們構造的10個線程,在一開始執行便是按照其編號從小到大在等待隊列中(1到10)。然後主線程打印結果,等待3秒後釋放鎖。由於前3個線程,編號1到3是處於共享狀態阻塞的,而第4個線程是處於排它狀態阻塞,所以,按照上面的喚醒順序,喚醒傳遞到第4個線程時就結束。 依次類推,理論上的打印順序是 :主線程 [1,2,3] 4 [5,6,7] 8 [9,10] 從下面的執行結果來看,也是符合我們的預期的。 技術分享圖片 6、讀線程之間的喚醒 如果一個線程在共享模式下獲取了鎖狀態,這個時候,它是否要喚醒其它在共享模式下等待在該鎖上的線程? 由於多個線程可以同時獲取共享鎖而不相互影響,所以,當一個線程在共享狀態下獲取了鎖之後,理論上是可以喚醒其它在共享狀態下等待該鎖的線程。但如果這個時候,在這個等待隊列中,既有共享狀態的線程,同時又有排它狀態的線程,這個時候又該如何喚醒? 實際上對於鎖來說,在共享狀態下,一個線程無論是獲取還是釋放鎖的時候,都會試著去喚醒下一個等待在這個鎖上的節點(通過上面的doAcquireShared代碼能看出)。如果下一個線程也是處於共享狀態等待在鎖上,那麽這個線程就會被喚醒,然後接著試著去喚醒下一個等待在這個鎖上的線程,這種喚醒動作會一直持續下去,直到遇到一個在排它狀態下阻塞在這個鎖上的線程,或者等待隊列全部被釋放為止。 因為線程是在一個FIFO的等待隊列中,所以,這這樣一個一個往後傳遞,就能保證喚醒被傳遞下去。

共享鎖(S鎖)和排它鎖(X鎖)