1. 程式人生 > >多執行緒之美8一 AbstractQueuedSynchronizer原始碼分析<二>

多執行緒之美8一 AbstractQueuedSynchronizer原始碼分析<二>

目錄

AQS的原始碼分析 <二>

該篇主要分析AQS的ConditionObject,是AQS的內部類,實現等待通知機制。

1、條件佇列

條件佇列與AQS中的同步佇列有所不同,結構圖如下:

兩者區別:

  • 1、連結串列結構不同,條件佇列是單向連結串列,同步佇列是雙向連結串列。
  • 2、兩個佇列中等待條件不同,條件佇列中執行緒是已經獲取到鎖,主動呼叫await方法釋放鎖,掛起當前執行緒,等待某個條件(如IO,mq訊息等),同步佇列中的執行緒是等待獲取鎖,在獲取鎖失敗後掛起等待鎖可用。

兩者聯絡:

當等待的某個條件完成,其他執行緒呼叫signal方法,通知掛起在條件佇列中的執行緒,會將條件佇列中該node移出,加入到同步佇列中,node的ws狀態由Node.CONDITION改為0 ,開始等待鎖。

2、ConditionObject

ConditionObject 和 Node一樣,都是AQS的內部類, ConditionObject實現Condition介面,主要實現執行緒呼叫 await和signal ,實現執行緒條件阻塞和通知機制,Condition物件通過 Lock子類呼叫newConditon方法獲取,以

ReentrantLock為例,程式碼如下:

ReentrantLock lock  = new ReentrantLock();
Condition condition =  lock.newCondition();

可見排他鎖的newCondition方法返回的是ConditionObject物件

final ConditionObject newCondition() {
    return new ConditionObject();
}

簡單生產者消費示例程式碼:

package AQS;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
 * @author zdd
 * 2019/12/30 下午
 * Description: 利用ReentrantLock和Condition實現生產者消費者
 */
public class ConditionTest {
   static ReentrantLock lock  = new ReentrantLock();
   static Condition condition =  lock.newCondition();
    public static void main(String[] args) {
       //資源類
        Apple apple = new Apple();
     //1.開啟生產者執行緒
        new Thread(()-> {
            for (;;) {
                lock.lock();
                try {
                    //蘋果沒有被消費,吃完通知我,我再生產哦
                    if (apple.getNumber() > 0) {
                        condition.await();
                    }
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("生產一個蘋果");
                    apple.addNumber();
                    //通知消費執行緒消費
                    condition.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        },"producer").start();
      //2.開啟消費者執行緒
        new Thread(()-> {
            for (;;) {
                lock.lock();
                try {
                    //蘋果數量為0,掛起等待生產蘋果,有蘋果了會通知
                    if(apple.getNumber() == 0) {
                        condition.await();
                    }
                    TimeUnit.SECONDS.sleep(1);
                    System.out.println("消費一個蘋果");
                    apple.decreNumber();
                    //通知生產執行緒生產
                    condition.signal();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    lock.unlock();
                }
            }
        },"consumer").start();

    }
  //定義蘋果內部類 
   static class Apple {
        //記錄蘋果數量
        private Integer number =0;
        public void addNumber() {
            number++;
            System.out.println(Thread.currentThread().getName() +"當前蘋果數量:"+number );
        }
        public void decreNumber() {
            number--;
            System.out.println(Thread.currentThread().getName() +"當前蘋果數量:"+number);
        }
        public Integer getNumber() {
            return number;
        }
    }
}

執行結果如下圖:

2.1、 await() 方法

當前執行緒是在已經獲取鎖的情況下,呼叫await方法主動釋放鎖,掛起當前執行緒,等待某個條件(IO,mq訊息等)喚醒,再去競爭獲取鎖的過程。該方法會將當前執行緒封裝到node節點中,新增到Condition條件佇列中,釋放鎖資源,並掛起當前執行緒。

具體執行步驟如下:

1、執行緒封裝到node中,並新增到Condition條件佇列中,ws =-2 即為Node.CONDITION。

2、釋放鎖。

3、將自己阻塞掛起,如果執行緒被喚醒,首先檢查自己是被中斷喚醒的不。如果是被中斷喚醒,跳出while迴圈;如果是被其他執行緒signal喚醒,則判斷當前執行緒所在node是否被加入到同步等待佇列,已在同步佇列中也跳出while迴圈,否則繼續掛起,signal喚醒邏輯會將condition條件佇列node 移出,加入到同步佇列中,去等待獲取鎖。

4,執行緒被喚醒,執行acquireQueued方法,執行緒會嘗試獲取鎖,若失敗則在同步佇列中找到安全位置阻塞,成功則從呼叫await()方法處繼續向下執行,返回值表示當前執行緒是否被中斷過。

public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
               //掛起當前執行緒
                LockSupport.park(this);
              // 被喚醒: 1,被其他執行緒喚醒,2,中斷喚醒,
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
           //1,如果被signal正常喚醒執行acquireQueued,返回false,如果獲取到鎖就繼續執行呼叫await後面的程式碼了,未獲取到鎖就在同步佇列中繼續掛起等待鎖執行了
           //2,如果被中斷喚醒的,acquireQueued 返回true
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
               //執行緒在被signal後,再被中斷的
                interruptMode = REINTERRUPT;
         //  後面程式碼處理的是被中斷喚醒的情況
            if (node.nextWaiter != null) // clean up if cancelled
              //如果nextWaiter!=null,則表示還在條件佇列中,清理一下所有被取消node
              //什麼情況下會進入該if判斷中,如果是正常被signal的,會將該node從條件佇列移出加入到同步佇列中的, nextWaiter 一定為null,那就是被異常中斷情況,
                unlinkCancelledWaiters();
            if (interruptMode != 0)
               //響應中斷模式
                reportInterruptAfterWait(interruptMode);
        }

第1步,執行addConditionWaiter方法,主要邏輯是將執行緒封裝為Node,並新增到條件佇列中

        private Node addConditionWaiter() {
          //1.獲取佇列中最後一個節點
            Node t = lastWaiter;
            //2.如果最後一個節點被取消,清除出隊
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            //3. t 指向最新有效的節點,也可能條件佇列為空,t==null
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

第2步,完全釋放鎖 fullyRelease,將同步狀態state 設定為初始值0,這裡考慮到有多次重入獲取鎖情況,state >1,這時需完全釋放鎖。

  final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            //1,釋放鎖
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
            //2,釋放鎖失敗,將條件佇列中的節點標記為取消
                node.waitStatus = Node.CANCELLED;
        }
    }

isOnSyncQueue 判斷node是否在同步佇列中

 final boolean isOnSyncQueue(Node node) {
       //1,這2種情況肯定沒有在同步佇列中
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
       //3.從同步佇列尾節點開始對比,看是否在同步佇列中
        return findNodeFromTail(node);
    }

findNodeFromTail 從後向前尋找

  private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

線上程被喚醒後,檢查掛起期間是否被中斷

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) :
        0;
}

如果執行緒被中斷了,那就需要將在條件佇列中等待的該節點執行 transferAfterCancelledWait

 final boolean transferAfterCancelledWait(Node node) {
       // 判斷是否是被signal通知喚醒的,會更新為0,更新成功,執行入隊操作(加入同步佇列)
        if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
            enq(node);
            return true;
        }
        while (!isOnSyncQueue(node))
          //未在同步佇列中,讓出處理器,執行緒回到就緒態,等待下一次分配cpu排程
            Thread.yield();
        return false;
    }

最後根據不同的中斷值做出相應處理

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)
        //1,直接丟擲中斷異常
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)
        //2,中斷標誌
        selfInterrupt();
}

2.2、signal方法

就是將條件佇列中的node移出,加入到同步佇列等待獲取鎖的過程。

流程圖如下:

  public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }
   private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
              // 1、將first節點執行出隊操作
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
             //2,如果條件佇列中有ws =-2的節點,肯定會移出一個到同步佇列中
        }
final boolean transferForSignal(Node node) {
       //1,將node ws更新為0 ,如果node 狀態不等於CONDITION,一定是被取消了
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
       //2,加入到同步佇列中,返回的p是node的pre 
        Node p = enq(node);
        int ws = p.waitStatus;
       //3,如果前置節點被取消,或者更新p的 ws =-1 失敗,直接喚醒執行緒,否則等待前置節點喚醒自己
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
           //喚醒執行緒
            LockSupport.unpark(node.thread);
        return true;
    }

3、總結

1、Condition提供的阻塞通知機制與Object類兩者對比:

  • 方法不同,Condition提供方法有 await(), signal(),signalAll(), Object類提供的是wait(),notify() , notifyAll()
  • 配合使用物件不同,Condition條件需要和Lock配合使用,Object類需和Synchronized關鍵字配合。
  • 多條件, Condition可實現多個條件,即建立多個Condition物件,可以每個Condition物件對應一種條件,從而有選擇的實現喚醒通知,Object類的要喚醒一個阻塞執行緒,只能在一個條件佇列中,喚醒是隨機的,沒有Condition使用靈活。

2、注意區別Condition條件佇列與同步佇列兩者的區別,2個佇列中執行緒等待條件不