深入理解條件變數Condition
序
可重入鎖(ReentrantLock)是 synchronized 關鍵字的擴充套件,更加靈活。還有一種ReentrantLock應用場景是和Condition搭配使用,實現多執行緒環境下等待狀態條件的功能。 Object.wait 和 Object.notify 是和 synchronized 配合使用的,條件變數Condition.await 和 Condition.signal 是和ReentrantLock相關聯的。
接下來先通過一個Demo看看Condition的用法,然後列舉兩個應用的地方,最後分析其原始碼實現。
一個簡單Demo
先通過一個Demo看看怎麼使用Condition,主執行緒通知條件滿足,通過另一個執行緒繼續執行,可以看到的是Condition.await/signal方法需要和一個ReentrantLock繫結。
public class ReenterLockCondition implements Runnable { public static ReentrantLock lock = new ReentrantLock(); public static Condition condition = lock.newCondition(); @Override public void run() { try { lock.lock(); condition.await(); System.out.println(String.format("條件滿足,執行緒%s執行!", Thread.currentThread().getName())); } catch (InterruptedException e) { e.printStackTrace(); } finally { lock.unlock(); } } public static void main(String args[]) throws InterruptedException { ReenterLockCondition reenterLockCondition = new ReenterLockCondition(); Thread thread1 = new Thread(reenterLockCondition); thread1.setName("T1"); thread1.start(); Thread.sleep(2000); System.out.println("通知T1條件滿足"); lock.lock(); condition.signal(); lock.unlock(); } }
應用舉例:JDK ArrayBlockingQueue

JDK併發包中的 ArrayBlockingQueue 就使用了Condition來同步佇列的空/滿狀態。先看條件變數的定義:
/** Main lock guarding all access */ final ReentrantLock lock; /** Condition for waiting takes */ private final Condition notEmpty; /** Condition for waiting puts */ private final Condition notFull; public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; lock = new ReentrantLock(fair); notEmpty = lock.newCondition(); notFull =lock.newCondition(); }
ArrayBlockingQueue 構造的時候初始化了2個條件變數:非空,非滿,他們都是和同一個重入鎖關聯的。
元素入隊的時候,如果佇列一直滿的話就一直阻塞等待非滿條件為真,否則的話就插入對應的元素,並且通知非空條件為真。
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); enqueue(e); } finally { lock.unlock(); } } private void enqueue(E x) { // assert lock.getHoldCount() == 1; // assert items[putIndex] == null; final Object[] items = this.items; items[putIndex] = x; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }
同樣的,出隊的時候,如果佇列沒有元素就等待非空的條件為真,否則出隊,並通知非滿條件為真。
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); return dequeue(); } finally { lock.unlock(); } } private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; items[takeIndex] = null; if (++takeIndex == items.length) takeIndex = 0; count--; if (itrs != null) itrs.elementDequeued(); notFull.signal(); return x; }
應用舉例:Kafaka BufferPool
KafkaProducer裡面有一個buffer pool的概念,BufferPool裡面記憶體分配,回收過程也有用到條件變數。
public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException { this.lock.lock(); try { // check if we have a free buffer of the right size pooled if (size == poolableSize && !this.free.isEmpty()) return this.free.pollFirst(); // now check if the request is immediately satisfiable with the // memory on hand or if we need to block int freeListSize = this.free.size() * this.poolableSize; if (this.availableMemory + freeListSize >= size) { // 不斷釋放free佇列中的buffer,直到可用記憶體>size // we have enough unallocated or pooled memory to immediately // satisfy the request freeUp(size); this.availableMemory -= size; lock.unlock(); // 使用的不是free佇列中的buffer,而是直接分配HeapByteBuffer return ByteBuffer.allocate(size); } else { // 沒有足夠空間,阻塞 // we are out of memory and will have to block int accumulated = 0; ByteBuffer buffer = null; Condition moreMemory = this.lock.newCondition(); long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs); this.waiters.addLast(moreMemory); // loop over and over until we have a buffer or have reserved // enough memory to allocate one while (accumulated < size) { long startWaitNs = time.nanoseconds(); long timeNs; boolean waitingTimeElapsed; try { waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS); } catch (InterruptedException e) { this.waiters.remove(moreMemory); throw e; } finally { long endWaitNs = time.nanoseconds(); timeNs = Math.max(0L, endWaitNs - startWaitNs); this.waitTime.record(timeNs, time.milliseconds()); } if (waitingTimeElapsed) { this.waiters.remove(moreMemory); throw new TimeoutException("Failed to allocate memory within the configured max blocking time " + maxTimeToBlockMs + " ms."); } remainingTimeToBlockNs -= timeNs; // check if we can satisfy this request from the free list, // otherwise allocate memory if (accumulated == 0 && size == this.poolableSize && !this.free.isEmpty()) { // just grab a buffer from the free list buffer = this.free.pollFirst(); accumulated = size; } else { // 先分配一部分空間 // we'll need to allocate memory, but we may only get // part of what we need on this iteration freeUp(size - accumulated); int got = (int) Math.min(size - accumulated, this.availableMemory); this.availableMemory -= got; accumulated += got; } } // 成功分配空間後,移除條件變數 // remove the condition for this thread to let the next thread // in line start getting memory Condition removed = this.waiters.removeFirst(); if (removed != moreMemory) throw new IllegalStateException("Wrong condition: this shouldn't happen."); // 如果還有剩餘空間,還有等待執行緒,則喚醒 // signal any additional waiters if there is more memory left // over for them if (this.availableMemory > 0 || !this.free.isEmpty()) { if (!this.waiters.isEmpty()) this.waiters.peekFirst().signal(); } // unlock and return the buffer lock.unlock(); if (buffer == null) return ByteBuffer.allocate(size); else return buffer; } } finally { if (lock.isHeldByCurrentThread()) lock.unlock(); } } public void deallocate(ByteBuffer buffer, int size) { lock.lock(); try { // 如果是poolableSize大小的buffer,則入free佇列管理 if (size == this.poolableSize && size == buffer.capacity()) { buffer.clear(); this.free.add(buffer); } else { // 否則直接增加大小,分配的buffer由GC回收 this.availableMemory += size; } // 喚醒一個因記憶體分配得不到滿足而阻塞的執行緒 Condition moreMem = this.waiters.peekFirst(); if (moreMem != null) moreMem.signal(); } finally { lock.unlock(); } }
實現分析
接下來看看條件變數是怎麼實現的?可重入鎖關聯的條件變數實現類是AQS內部類ConditionObject,接下來通過其中的await和signal方法的原始碼看看Condition的實現。
public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); // 釋放關聯的鎖 int savedState = fullyRelease(node); int interruptMode = 0; // 如果該節點沒有放入同步佇列(sync queue),則阻塞等待 while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } // 節點進入了同步佇列,則爭用鎖,鎖獲取成功後,await就退出了 if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); } // 新增一個條件節點 private Node addConditionWaiter() { Node t = lastWaiter; // If lastWaiter is cancelled, clean out. if (t != null && t.waitStatus != Node.CONDITION) { unlinkCancelledWaiters(); t = lastWaiter; } Node node = new Node(Thread.currentThread(), Node.CONDITION); if (t == null) firstWaiter = node; else t.nextWaiter = node; lastWaiter = node; return node; } public final void signal() { // 先判斷當前執行緒是否獲取了鎖,否則異常 if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); } // 摘除頭節點,把頭節點移到同步佇列 private void doSignal(Node first) { do { if ( (firstWaiter = first.nextWaiter) == null) lastWaiter = null; first.nextWaiter = null; } while (!transferForSignal(first) && (first = firstWaiter) != null); } final boolean transferForSignal(Node node) { // CAS 將狀態設定為0,0是一個無效的狀態 if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)) return false; /* * Splice onto queue and try to set waitStatus of predecessor to * indicate that thread is (probably) waiting. If cancelled or * attempt to set waitStatus fails, wake up to resync (in which * case the waitStatus can be transiently and harmlessly wrong). */ Node p = enq(node); int ws = p.waitStatus; // 喚醒執行緒 if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)) LockSupport.unpark(node.thread); return true; } // 這個方法是AQS中的,node進入同步佇列 private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
Linux環境下的條件變數
瞭解Linux環境程式設計的會想到其中也有條件變數的機制來實現執行緒之間的同步,和Java中的基本上語義是相同的,在等待條件的時候都會原子的釋放鎖。


相關閱讀
ofollow,noindex" target="_blank">可重入鎖 ReentrantLock 原始碼閱讀
pthread_cond_wait" target="_blank" rel="nofollow,noindex">pthread_cond_wait
《Unix高階環境程式設計》