1. 程式人生 > >Condition的await和signal原理詳解(Condition下的生產消費模型)

Condition的await和signal原理詳解(Condition下的生產消費模型)

Condition的await和signal與Object的wait與notify區別:

任何一個Java物件都天然繼承於Object類,線上程間實現通訊會用到Object的幾個方法,如wait(),wait(long timeout),wait(long timeout,int nanos)與notify(),notifyAll()這幾個方法實現等待通知機制,同樣的,在Java Lock體系有同樣的方法實現等待通知機制。從整體上看Object的wait與notify是與物件監視器(synchronized同步程式碼塊或者同步方法中)配合完成執行緒間的等待通知機制,而Condition的await和signal與Lock配合完成等待通知機制,前者是JVM底層級別(不可以看原始碼),後者是Java語言級別,具有更高的可控制性和擴充套件性(可以看原始碼)。
兩者在功能特性上還有如下不同:
1.Condition支援不響應中斷,而Object不支援,也就是Object只要有中斷就要響應。
2.Condition支援多個等待佇列(new多個Condition物件),而Object只有一個等待佇列,但兩者都只要一個同步佇列;
3.Condition支援截止時間設定,而Object是超時時間設定,支援截止時間設定,不用計算需要等多久。

Condition :

public interface Lock {
   Condition newCondition();
   }
public interface Condition {
	//當前執行緒進入等待狀態,直到被中斷或喚醒
	 void await() throws InterruptedException;

	//不響應中斷(即使有中斷,也不會拋異常),直到被喚醒
	 void awaitUninterruptibly();
	 
	 //當前執行緒進入等待狀態直到被通知,中斷或者超時;
	 long awaitNanos(long nanosTimeout) throws
InterruptedException; //同Object.wait(long timeout),多了自定義時間單位 //直到超時,中斷,被喚醒 boolean await(long time, TimeUnit unit) throws InterruptedException; //支援設定截止時間,直到到截止時間、中斷、被喚醒 boolean awaitUntil(Date deadline) throws InterruptedException; //喚醒一個等待在Condition(等待佇列)上的執行緒,將該執行緒由等待佇列轉移到同步佇列 void signal()
; //將所有等待在condition上的執行緒全部轉移到同步佇列中 void signalAll();

Condition實現原理分析:


等待佇列

建立一個Condition物件是通過lock.newCondition( ),而這個方法實際上會new出一個ConditionObject物件,該類是AQS的一個內部類(lock的實現原理依賴AQS)。

 public class ConditionObject implements Condition, java.io.Serializable {
 		/** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

在ConditionObject 通過持有等待佇列的頭尾指標來管理等待佇列。這個Node複用了AQS的Node類,也就是等待佇列和同步佇列的結點一個Node類。
Node類中有這樣一個屬性:

//後繼節點
Node nextWaiter;

nextWaiter是等待佇列中標識下一個結點,也就是說Node結點的prev和next等待佇列沒有用到。
等待佇列是一個單向佇列,而同步佇列是一個雙向佇列。

package CODE.多執行緒;


import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class Await {
    public static void main(String[] args) {
        Lock lock=new ReentrantLock();
        Condition condition=lock.newCondition();
        for(int i=0;i<10;i++)
        {
            Thread thread=new Thread(()->
            {
                lock.lock();
                try {
                    condition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    lock.unlock();
                }
            });
            thread.start();
        }
    }
}

在這裡插入圖片描述
Debug模式下可以看見:
1.呼叫condition.wait方法後執行緒依次尾插到等待佇列中,對上例來說,順序依次是Thread-0,Thread-1,Thread-2…Thread-9;
2.等待你佇列是一個單向佇列,只有nextWaiter。
由於condition是new出來的,也就是說當多次呼叫lock.newCondition()可以建立多個condition物件,也就是一個lock可以有多個等待佇列。
用Object類wait在Object物件監視器上只有一個同步佇列和一個等待佇列,而在併發包中的Lock中有一個同步佇列多個等待佇列。

在這裡插入圖片描述

Condition的應用:實現有界佇列
有界佇列是一種特殊的佇列,當佇列為空時,佇列的獲取(刪除)操作將會阻塞獲取(刪除)執行緒,直到佇列中有新增結點;當佇列已滿時,佇列的插入操作將會阻塞新增執行緒,直到隊列出現空位。
程式碼如下:

package CODE.多執行緒;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

//Conditon實現有界佇列
class BoundQueue<T>
{
    private Object[] items;
    private int counts=0;  //intems中元素個數
    private Lock lock=new ReentrantLock();
    private Condition fullCondition=lock.newCondition();
    private Condition emptyCondition=lock.newCondition();
    public BoundQueue(int size)
    {
        items=new Object[size];
    }

    //向數組裡新增元素,如果陣列滿,進入等待狀態
    public void add(T t,int addIndex) throws InterruptedException {
        try
        {
            lock.lock();
            //陣列已滿,新增執行緒需要進入等待狀態
            while(counts==items.length)
            {
                System.out.println("陣列已滿,需要等待");
                fullCondition.await();
            }
            System.out.println(Thread.currentThread().getName()+"在新增元素");
            items[addIndex]=t;
            counts++;
            //元素新增完畢,需要喚醒清空佇列
            emptyCondition.signal();
        }finally {
            lock.unlock();
        }

    }
    //刪除元素方法,如果當前陣列為空,移除執行緒進入等待狀態直到陣列不為空
    public T remove(int removeIndex) throws InterruptedException {
        try
        {
            lock.lock();
            while(counts==0)
            {
                System.out.println("陣列已空,刪除等待");
               emptyCondition.await();
            }
            Object x=items[removeIndex];
            System.out.println(Thread.currentThread().getName()+"在刪除元素");
            counts--;
            //喚醒新增執行緒
            fullCondition.signal();
            return (T)x; //從大型別到小型別需要強轉
        }finally {
            lock.unlock();
        }
    }

}
class MyThread implements Runnable
{
    private BoundQueue boundQueue;
    private int flag;
    public MyThread(int flag,BoundQueue boundQueue)
    {
        this.boundQueue=boundQueue;
        this.flag=flag;
    }
    public void run()
    {
        if(flag==1)
        {
            try {
                boundQueue.add("asb",0);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        else
        {
            try {
                boundQueue.remove(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}
public class Bound {
    public static void main(String[] args) {
        BoundQueue<String> boundQueue=new BoundQueue<>(2);
        MyThread addthread=new MyThread(1,boundQueue);
        MyThread removethread=new MyThread(0,boundQueue);
        new Thread(removethread,"刪除執行緒1").start();
        new Thread(addthread,"新增執行緒1").start();
        new Thread(addthread,"新增執行緒2").start();
    }

}

在這裡插入圖片描述

await實現原理


原始碼如下:

 public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
                //將當前執行緒包裝成Node結點尾插到等待佇列
            Node node = addConditionWaiter();
            //由於尾插到等待佇列,呼叫await會釋放鎖,那麼當前執行緒釋放所佔用的lock,釋放後喚醒同步佇列中下一個結點
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
            //當前結點不再同步佇列時,阻塞該執行緒,進入等待狀態
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            //退出迴圈的條件:被喚醒進入同步佇列和被中斷
            //被喚醒後進入同步佇列自旋競爭同步狀態
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
                //處理中斷
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

呼叫condition.await( )方法後:
1.當前執行緒釋放lock,進入等待佇列,並且喚醒同步佇列中下一個結點;
2.當前執行緒被signal/signalAll後從等待佇列移至同步佇列,直到獲取lock才從await方法返回或者在等待時被中斷會做中斷處理。

接下來思考幾個問題:
1.當前執行緒如何新增到等待佇列中?
2.當前執行緒釋放鎖的過程?
3.如何從await方法退出?

1.呼叫addConditionWaiter將當前執行緒新增到等待佇列中,
原始碼如下:

 private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //將當前執行緒包裝成Node結點
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            //等待佇列為空,firstWaiter指向當前執行緒結點
            if (t == null)
                firstWaiter = node;
            else
            	//等待佇列不為空,尾插到等待佇列
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

addConditionWaiter():
1.如果等待佇列最後一個結點是取消狀態,將這個執行緒移除;
2.將當前執行緒包裝成Node結點,如果等待佇列為空(firstWaiter為null),將firstWaiter指向新包裝的Node結點,否則,將當前執行緒尾插到等待佇列,更新
lastWaiter(尾節點)。
總之:通過尾插的方式將當前執行緒封裝的Node結點插入到等待佇列中,同時,可以發現等待佇列是一個沒有頭結點的佇列。
等待佇列是一個無頭結點單向的鏈式佇列;
同步佇列是一個有頭尾結點的雙向鏈式佇列。

2.釋放當前鎖,fullyRelease()
噹噹前執行緒結點尾插到同步佇列後,會釋放lock鎖,喚醒同步佇列中下一個結點,原始碼如下:

 /**
     * Invokes release with current state value; returns saved state.
     * Cancels node and throws exception on failure.
     * @param node the condition node for this wait
     * @return previous sync state
     */
    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //呼叫AQS的釋放同步狀態方法release(),並喚醒同步佇列後繼結點
            if (release(savedState)) {
                //成功釋放鎖狀態
                failed = false;
                return savedState;
            } else {
            //釋放同步狀態失敗拋異常
                throw new IllegalMonitorStateException();
            }
        } finally {
        //釋放同步狀態後將當前結點設定為取消狀態
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

fullyRelease():
1.呼叫AQ的模板方法release方法釋放同步狀態並且喚醒在同步佇列中頭結點的後繼結點。
2.該執行緒釋放lock成功則正常返回,否則拋異常。

3.如何從await方法退出

while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }

注意while迴圈退出條件:該結點在同步佇列和該執行緒在等待過程被中斷。
當第一次呼叫condition.await(),會進入while迴圈(這時已經釋放鎖,進入等待佇列),然後通過LockSupport.park(this)使當前執行緒阻塞進入等待狀態。
如果有執行緒呼叫codition的signal或者signalAll方法該執行緒會進入到同步佇列,while迴圈為false,退出迴圈;
如果當前執行緒在等待的過程被中斷會break。

總之:當前執行緒被中斷或者呼叫condition.signal/condition.signalAll方法使當前執行緒移到同步佇列,會退出while迴圈。退出while迴圈後,如果在同步佇列中呼叫acquireQueued(node, savedState)自旋獲取同步狀態,直至執行緒獲取鎖退出awaqit方法;退出while迴圈後,如果被中斷則處理中斷異常。

在這裡插入圖片描述

當呼叫condition.await方法的執行緒必須是已經獲取了lock,也就當前執行緒是同步佇列的頭結點。呼叫await後,會使當前執行緒封裝成Node尾插到等待佇列中。

signal/signalAll實現原理

當呼叫condition.signal()會使等待佇列等待時間的執行緒結點也就是頭結點移動到同步佇列;當呼叫condition.signalAll()會使等待佇列中所有結點移動到同步佇列中。

signal原始碼為:

/**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
        //當前執行緒是否持有lock
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            //獲取等待佇列中第一個結點,第一個結點不為空,doSignal()
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

doSignal()

/**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
            //先將等待佇列第一個結點指向下一個結點,如果為空,證明等待佇列只有當前執行緒,那麼將lastWaiter指向null
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } 
            //transferForSignal方法對該執行緒節點做真正的處理
            while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

transferForSignal()

 /**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
         //首先將結點狀態設定為0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;  //失敗直接返回false

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
         //將結點使用enq尾插到同步佇列中
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

Condition.signal():
1.呼叫該方法的執行緒必須獲取到lock鎖;
2.呼叫該方法使等待佇列的頭結點即等待時間最長的執行緒結點尾插到同步佇列,移到同步佇列才有機會使得等待佇列被喚醒,即從await方法中的LockSupport.park(this)方法中返回,從而有機會使得呼叫await方法的執行緒成功。
在這裡插入圖片描述
signalAll( )

/**
         * Moves all threads from the wait queue for this condition to
         * the wait queue for the owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signalAll() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignalAll(first);
        }
 /**
         * Removes and transfers all nodes.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);  //喚醒了所有結點
        }

signalAll()將等待佇列中每一個結點都移到同步佇列中,即“通知”當前呼叫condition.await()方法的每一個執行緒。

Condition機制實現生產–消費者模型

package CODE.多執行緒;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

////Condition機制實現生產---消費者模型
class Goods1
{
    private String goodsname;
    private int maxCount;  //產品// 最大數量
    private int counts;  //當前產品數量
    private Lock lock=new ReentrantLock();
    private Condition proCondition=lock.newCondition();  //生產者等待佇列
    private Condition consumerCondition=lock.newCondition(); //消費者等待佇列
    public Goods1(int maxCount)
    {
        this.maxCount=maxCount;
    }

    //生產產品
    public void set(String goodsname)
    {
        lock.lock();
        try
        {
            while(counts==maxCount)
            {
                System.out.println("產品已達最大數量,稍後生產");
                try {
                    proCondition.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            try {
                Thread.sleep(20);
            } catch (InterruptedException e) {
                e.