1. 程式人生 > >多執行緒高併發程式設計(6) -- Semaphere、Exchanger原始碼分析

多執行緒高併發程式設計(6) -- Semaphere、Exchanger原始碼分析

一.Semaphere

  1.概念

  一個計數訊號量。在概念上,訊號量維持一組許可證。如果有必要,每個acquire()都會阻塞,直到許可證可用,然後才能使用它。每個release()新增許可證,潛在地釋放阻塞獲取方。但是,沒有使用實際的許可證物件;Semaphore只保留可用數量的計數,並相應地執行。即一個Semaphore維護了一組permits【許可證】。每次呼叫acquire()方法都會阻塞,直到獲取到許可證。每次呼叫release()方法都會新增一個許可證,也就是釋放一個被阻塞的獲取者。但是實際上並不存在這個許可證,Semaphore僅僅是記錄可用資源的數量,並且做出對應的行為(有資源就獲取,沒有資源就阻塞)。

  訊號量通常用於限制執行緒數,而不是訪問某些(物理或邏輯)資源。

  • 執行緒池控制的是執行緒數量,而訊號量控制的是併發數量,雖然說看起來一樣,但兩者還是有區別的。

  • 訊號量類似於鎖機制,訊號量的呼叫,當達到數量後,執行緒還是存在的,只是被掛起了而已。而執行緒池,同時執行的執行緒數量是固定的,超過了數量的只能等待。

  在獲得專案之前,每個執行緒必須從訊號量獲取許可證,以確保某個專案可用。當執行緒完成該專案後,它將返回到池中,並將許可證返回到訊號量,允許另一個執行緒獲取該專案。請注意,當呼叫acquire()時,不會保持同步鎖定,因為這將阻止某個專案返回到池中。訊號量封裝了限制對池的訪問所需的同步,與保持池本身一致性所需的任何同步分開。【即將限制對池的訪問和對池中資料的操作所需要的鎖分開】。

  訊號量被初始化為一個,並且被使用,使得它只有至多一個允許可用,可以用作互斥鎖。這通常被稱為二進位制訊號量,因為它只有兩個狀態:一個許可證可用,或零個許可證可用。當以這種方式使用時,二進位制訊號量具有屬性(與許多Lock實現不同),“鎖”可以由除所有者之外的執行緒釋放(因為訊號量沒有所有權概念)。這在某些專門的上下文中是有用的,例如死鎖恢復。

Semaphore(int permits) 建立一個 Semaphore與給定數量的許可證和非公平公平設定。  
Semaphore(int permits, boolean fair) 建立一個 Semaphore與給定數量的許可證和給定的公平設定。

  此類的建構函式可選擇接受公平引數。當設定為false時,此類不會保證執行緒獲取許可的順序。特別是,闖入是允許的,也就是說,一個執行緒呼叫acquire()可以提前已經等待執行緒分配的許可證-在等待執行緒佇列的頭部邏輯新的執行緒將自己【新執行緒將自己放在等待執行緒佇列的最前面】。當公平設定為真時,訊號量保證呼叫acquire方法的執行緒被選擇以按照它們呼叫這些方法的順序獲得許可(先進先出; FIFO)【FIFO的順序是指是依據到達方法內部的執行點的時間,並不是方法執行的時間。】。請注意,FIFO排序必須適用於這些方法中的特定內部執行點。因此,一個執行緒可以在另一個執行緒之前呼叫acquire,但是在另一個執行緒之後到達排序點,並且類似地從方法返回。另請注意,未定義的tryAcquire方法不符合公平性設定,但將採取任何可用的許可證。【不定時的tryAcquire()方法會任意選取可用的許可證。】【非公平鎖可以插隊獲取執行,公平鎖按照執行緒順序執行。】

  通常,用於控制資源訪問的訊號量應該被公平地初始化,以確保執行緒沒有被訪問資源【確保沒有執行緒因為長時間獲取不到許可證而餓死】。當使用訊號量進行其他型別的同步控制時,非正常排序的吞吐量優勢往往超過公平性。

  2.用法

  執行緒可以通過acquire()方法獲取到一個許可,然後對共享資源進行操作,如果許可集已分配完了,那麼執行緒將進入等待狀態,直到其他執行緒釋放許可才有機會再獲取許可,執行緒釋放一個許可通過release()方法完成,"許可"將被歸還給Semaphore。

    public static void main(String[] args) {
        ExecutorService service = Executors.newCachedThreadPool();
        final Semaphore sp = new Semaphore(3);
        for (int i = 0; i < 7; i++) {
            Runnable runnable = () -> {
                try {
                    sp.acquire();
                } catch (InterruptedException e1) {
                    e1.printStackTrace();
                }
                System.out.println("執行緒" + Thread.currentThread().getName() +
                        "進入,當前已有" + (3 - sp.availablePermits()) + "個併發");
                try {
                    Thread.sleep((long) (Math.random() * 10000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("執行緒" + Thread.currentThread().getName() +
                        "即將離開");
                sp.release();
                //下面程式碼有時候執行不準確,因為其沒有和上面的程式碼合成原子單元
                System.out.println("執行緒" + Thread.currentThread().getName() +
                        "已離開,當前已有" + (3 - sp.availablePermits()) + "個併發");
            };
            service.execute(runnable);
        }
    }
結果:
執行緒pool-1-thread-1進入,當前已有1個併發
執行緒pool-1-thread-2進入,當前已有2個併發
執行緒pool-1-thread-3進入,當前已有3個併發
執行緒pool-1-thread-3即將離開
執行緒pool-1-thread-4進入,當前已有3個併發
執行緒pool-1-thread-3已離開,當前已有3個併發
執行緒pool-1-thread-1即將離開
執行緒pool-1-thread-1已離開,當前已有2個併發
執行緒pool-1-thread-5進入,當前已有3個併發
執行緒pool-1-thread-5即將離開
執行緒pool-1-thread-5已離開,當前已有2個併發
執行緒pool-1-thread-6進入,當前已有3個併發
執行緒pool-1-thread-4即將離開
執行緒pool-1-thread-4已離開,當前已有2個併發
執行緒pool-1-thread-7進入,當前已有3個併發
執行緒pool-1-thread-2即將離開
執行緒pool-1-thread-2已離開,當前已有2個併發
執行緒pool-1-thread-7即將離開
執行緒pool-1-thread-7已離開,當前已有1個併發
執行緒pool-1-thread-6即將離開
執行緒pool-1-thread-6已離開,當前已有0個併發

  3.acquire解析

acquire() 從該訊號量獲取許可證,阻止直到可用,或執行緒為 interrupted 。  
void acquire(int permits) 從該訊號量獲取給定數量的許可證,阻止直到所有可用,否則執行緒為 interrupted 。
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);//呼叫AQS的acquireSharedInterruptibly
    }
        /**
         * AQS的acquireSharedInterruptibly
         * Acquires in shared mode, aborting if interrupted.  Implemented
         * by first checking interrupt status, then invoking at least once
         * {@link #tryAcquireShared}, returning on success.  Otherwise the
         * thread is queued, possibly repeatedly blocking and unblocking,
         * invoking {@link #tryAcquireShared} until success or the thread
         * is interrupted.
         * 以共享模式獲取,如果中斷被中止。
         * 實現首先檢查中斷狀態,然後至少呼叫一次tryacquirered,成功返回。
         * 否則,執行緒排隊,可能會重複阻塞和取消阻塞,
         * 呼叫tryacquiremred直到成功或執行緒被打斷了。
         */
    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())//中斷丟擲異常
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)//獲取失敗,加入同步佇列等待
            doAcquireSharedInterruptibly(arg);
    }
    //由Semaphore的FairSync或NonfairSync實現,共享模式下資源可以被多個執行緒通知佔用,tryAcquireShared返回int型別,表示還有多少個資源可以同時被佔用,用於共享模式下傳播喚醒。
    protected int tryAcquireShared(int arg) {
        throw new UnsupportedOperationException();
    }
    //以共享中斷模式獲取
    private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.SHARED);//建立當前執行緒的節點,並且鎖的模型是共享鎖,將其新增到AQS CLH佇列的末尾
        boolean failed = true;
        try {
            for (;;) {//自旋
                final Node p = node.predecessor();//獲得當前節點的前驅節點
                if (p == head) {//是頭節點,沒有等待節點
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {//獲取成功當前節點設定為頭節點並傳播【傳播指的是,同步狀態剩餘的許可數值不為0,通知後續結點繼續獲取同步狀態】
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        failed = false;
                        return;
                    }
                }
                //前繼節點非head節點,沒資源獲取,將前繼節點狀態設定為SIGNAL,通過park掛起node節點的執行緒
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);//結束該結點執行緒的請求
        }
    }
  public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();//數量小於0丟擲異常
        sync.acquireSharedInterruptibly(permits);//呼叫AQS的acquireSharedInterruptibly
    }

  tryAcquireShared:

    static final class FairSync extends Sync {//公平鎖獲取
        protected int tryAcquireShared(int acquires) {
            for (;;) {//自旋
                //有前驅節點,表示當前執行緒前面有阻塞執行緒,當前執行緒獲取失敗,先讓前節點執行緒獲取執行【比非公平鎖獲取多了判斷前驅節點的操作】
                if (hasQueuedPredecessors())
                    return -1;
                int available = getState();//可獲取的許可證數量
                int remaining = available - acquires;//剩下的許可證數量
                //如果剩餘數量小於0或更新剩餘數量成功,返回剩餘數量
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }
    }
    static final class NonfairSync extends Sync {//非公平鎖獲取
        protected int tryAcquireShared(int acquires) {
            return nonfairTryAcquireShared(acquires);//呼叫Semaphore的內部類Sync的nonfairTryAcquireShared
        }
    }
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {//自旋
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

  4.release解析

    public void release() {
        sync.releaseShared(1);//呼叫AQS的releaseShared
    }
    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {//釋放同步狀態成功
            doReleaseShared();//喚醒同步佇列中後繼結點的執行緒
            return true;
        }
        return false;
    }
    protected boolean tryReleaseShared(int arg) {//由Semaphore的Sync實現
        throw new UnsupportedOperationException();
    }
    private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         * 保證釋放動作(向同步等待佇列尾部)傳遞,即使沒有其他正在進行的
         * 請求或釋放動作。如果頭節點的後繼節點需要喚醒,那麼執行喚醒
         * 動作;如果不需要,將頭結點的等待狀態設定為PROPAGATE保證
         * 喚醒傳遞。另外,為了防止過程中有新節點進入(佇列),這裡必
         * 需做迴圈,所以,和其他unparkSuccessor方法使用方式不一樣
         * 的是,如果(頭結點)等待狀態設定失敗,重新檢測。
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;//頭節點狀態
                 // 如果頭節點對應的執行緒是SIGNAL狀態,則意味著頭
                 //結點的後繼結點所對應的執行緒需要被unpark-喚醒。
                if (ws == Node.SIGNAL) {
                    // 修改頭結點對應的執行緒狀態設定為0。失敗的話,則繼續迴圈。
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases
                    // 喚醒頭結點h的後繼結點所對應的執行緒
                    unparkSuccessor(h);
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }
            // 如果頭結點發生變化,則繼續迴圈。否則,退出迴圈。
            if (h == head)                   // loop if head changed
                break;
        }
    }
    //喚醒傳入結點的後繼結點對應的執行緒
    private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
          if (ws < 0)
              compareAndSetWaitStatus(node, ws, 0);
           //拿到後繼結點
          Node s = node.next;
          if (s == null || s.waitStatus > 0) {
              s = null;
              for (Node t = tail; t != null && t != node; t = t.prev)
                  if (t.waitStatus <= 0)
                      s = t;
          }
          if (s != null)
              //喚醒該執行緒
              LockSupport.unpark(s.thread);
    }
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {//自旋
            int current = getState();//獲取當前同步狀態
            int next = current + releases;//狀態+1
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))//更新狀態成功返回true
                return true;
        }
    }

二.Exchanger

  1.概念

  執行緒可以在成對內配對和交換元素的同步點。每個執行緒在輸入exchange方法時提供一些物件,與合作者執行緒匹配,並在返回時接收其合作伙伴的物件。交換器可以被視為一個的雙向形式SynchronousQueue。交換器在諸如遺傳演算法和管道設計的應用中可能是有用的。

  一個用於兩個工作執行緒之間交換資料的封裝工具類,簡單說就是一個執行緒在完成一定的事務後想與另一個執行緒交換資料,則第一個先拿出資料的執行緒會一直等待第二個執行緒,直到第二個執行緒拿著資料到來時才能彼此交換對應資料。  

  2.用法

  Exchanger<V> 泛型型別,其中 V 表示可交換的資料型別

  • V exchange(V v):等待另一個執行緒到達此交換點(除非當前執行緒被中斷),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。

  • V exchange(V v, long timeout, TimeUnit unit):等待另一個執行緒到達此交換點(除非當前執行緒被中斷或超出了指定的等待時間),然後將給定的物件傳送給該執行緒,並接收該執行緒的物件。

        Exchanger<Integer> exchanger = new Exchanger<>();
        ExecutorService executor = Executors.newCachedThreadPool();
        Runnable run = () ->{
            try {
                int num = new Random().nextInt(10);
                System.out.println(Thread.currentThread().getName()+"開始交換資料:"+num);
                num = exchanger.exchange(num);//交換資料並得到交換後的資料
                System.out.println(Thread.currentThread().getName()+"交換資料結束後的資料:"+num);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        };
        executor.execute(run);
        executor.execute(run);
        executor.shutdown();
結果:
pool-1-thread-2開始交換資料:9
pool-1-thread-1開始交換資料:8
pool-1-thread-2交換資料結束後的資料:8
pool-1-thread-1交換資料結束後的資料:9

  3.exchange原始碼解析 參考下面的部落格,寫的很詳細

  • https://blog.csdn.net/qq_31865983/article/details/105620881
  • https://www.xuebuyuan.com/2736097.html