1. 程式人生 > >java架構之路(多執行緒)JUC併發程式設計之Semaphore訊號量、CountDownLatch、CyclicBarrier柵欄、Executors執行緒池

java架構之路(多執行緒)JUC併發程式設計之Semaphore訊號量、CountDownLatch、CyclicBarrier柵欄、Executors執行緒池

上期回顧:

  上次部落格我們主要說了我們juc併發包下面的ReetrantLock的一些簡單使用和底層的原理,是如何實現公平鎖、非公平鎖的。內部的雙向連結串列到底是什麼意思,prev和next到底是什麼,為什麼要引入heap和tail來值向null的Node節點。高併發時候是如何保證state來記錄重入鎖的,在我們的上次部落格都做了詳細的說明。這次我們來聊一些簡單易懂且實用的AQS中的工具類。

Semaphore訊號量:

  這個東西很簡單,別看字面意思,什麼訊號量,我也不懂得那個術語什麼意思,Semaphore你可以這樣來理解,我們要去看電影,而且是3D電影(必須戴3D眼鏡才可以進入),但是比較不巧的是我們電影院只有兩個3D眼鏡了,也就是說,我們每次只能進去兩個人看電影,然後等待這兩個人看完電影以後把眼鏡還回來,後面的兩個人才能繼續觀看,就是說每次只允許最多進去兩個人,每次進入到執行緒獲取鎖,需要你得到前置的票據,才可以進行後續的流程。可以理解為一個簡單的限流吧。我們來一下程式碼示例。

public class Test {

    public static void main(String[] args) throws InterruptedException {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 5; i++) {
            new Thread(new Task(semaphore,"xiaocaijishu"+i)).start();
        }
    }

    static class Task extends Thread{
        Semaphore semaphore;

        public Task(Semaphore semaphore,String tname){
            this.semaphore = semaphore;
            this.setName(tname);
        }

        public void run() {
            try {
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"拿著3D眼鏡進去了,時間是"+System.currentTimeMillis());

                Thread.sleep(1000);

                semaphore.release();
                System.out.println(Thread.currentThread().getName()+"出來了,將3D眼鏡還給了服務人員,時間是"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

執行結果就是這樣的

  我們來解釋一下執行結果,執行緒1和執行緒3同一時間去看電影了,然後1出來了,這時執行緒9馬上拿著我們的3D眼鏡進去了,過了一會執行緒3也看完電影了,出來了還了3D眼鏡,執行緒7又在同一時間拿著3D眼鏡進去看電影了,後續執行緒都是如此執行的,每次只是進入兩個執行緒。

  簡單的使用看到了,我們來看看底層的原始碼設計吧。開始的時候我們是建立一個Semaphore內部票據數目給予的是2。

//1.建立初始票據是2的Semaphore 
Semaphore semaphore = new Semaphore(2);
//2.進入Semaphore,檢視資料2是如何儲存的.
public Semaphore(int permits) {
    sync = new NonfairSync(permits);
}
//3.底層還是基於sync 建立了一個物件,但不同於過去ReetrantLock的是,這次是一個非公平的鎖物件,我們再次進入NonfairSync看看那個數字2到底放在哪裡了.
Sync(int permits) {
    setState(permits);
}
//4.我們可以看到底層還是用State來儲存的.

  這次沒有把所有程式碼全部粘出來,感覺那樣像是湊篇幅一樣。

  通過上述程式碼,我們可以看到,我們的初始票據數,是上一次那個state來存的。

  後續我們呼叫了acquire方法來嘗試獲取票據,acquire方法也可以傳入獲取票據數目的比如semaphore.acquire(2);也是可以的。我們進入acquire方法來看看到底是如何獲取的。

//從new Semaphore(2);點選進入後續方法
public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
//我們可以看到,當我們沒有傳需要獲取多少票據的時候,會預設給予1這個引數,我們來繼續看後續流程
public final void acquireSharedInterruptibly(int arg)
        throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}
//Thread.interrupted()判斷當前執行緒是否已經中斷,如果中斷我直接丟擲異常,電影都演完了,我拿3D眼鏡還有毛線用.
//tryAcquireShared(arg)嘗試獲取票據,arg是1,剛才給予的預設1
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
                compareAndSetState(available, remaining))
            return remaining;
    }
}
//內部有實現關係,所以呼叫的是Semaphore類nonfairTryAcquireShared方法,我們來解讀一下
//直接就是一個死迴圈, int available = getState();獲取一下當前還有多少票據
// int remaining = available - acquires;計算出當前票據減去所需票據的一個剩餘值
//if (remaining < 0 || compareAndSetState(available, remaining))我們現有2個票據,拿走1個,剩餘1個,所以remaining < 0 一定是false的
//再來看另一半compareAndSetState,用原子計算(上次部落格說過為什麼要原子計算)方式來修改剩餘票據,這個是可以修改成功的.所以滿足條件可以返回一個2-1  也就是返回一個正數1

  是不是有點看懵圈了,很多小夥伴感覺if (remaining < 0 ||compareAndSetState(available, remaining))前面的remaining<0,這個或判斷貌似沒用啊,來張圖解釋一下。

   有沒有感覺好點了,自己可以跟著原始碼走一走,獲取的過程就差一個doAcquireSharedInterruptibly還沒有看了,如果獲取超過了票據數,也就是不應該讓返回負數時執行doAcquireSharedInterruptibly方法,我們來看一下。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);//以共享方式新增節點
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();//判斷前驅節點是否為空
            if (p == head) {
                int r = tryAcquireShared(arg);//再次嘗試獲取票據
                if (r >= 0) {//>= 0表示獲取票據成功
                    setHeadAndPropagate(node, r);//更改頭節點
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) && //剔除不可用的Node節點
                    parkAndCheckInterrupt()) //阻塞當前執行緒
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

  經過兩次以上的嘗試,我們將該執行緒阻塞了,不至於一直for迴圈在執行,也就這樣,票據發放完畢了。

  過程差不多就是這樣的,我們可以再仔細看一下是如何新增節點的,上次ReetrantLock說了一些,我們這次再來看一下。我們現已第一次塞節點為例,

private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {//第一次一定是空的,我們現在已初始塞節點為例。
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);//為空直接進入這個邏輯
    return node;
}
private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize //1.第一次一定是空 //二次迴圈不為空 進入else
            if (compareAndSetHead(new Node()))//2.建立一個空節點,並且作為head節點.
                tail = head;//3.tail指向那個head節點
        } else {
            node.prev = t;//4. 將node節點的前驅指標指向
            if (compareAndSetTail(t, node)) {//5.原子計算方式將node節點後驅節點指向tail
                t.next = node;//6.將t節點(空節點)的後驅指標指向node節點
                return t;
            }
        }
    }
}

 

   第一次迴圈只是一個內部的初始空節點,第二次迴圈才是移動指標塞入的過程。

 

   節點喚醒是在釋放票據時被喚醒的,程式碼超級簡單,可以自己當做一份作業,自己去看一遍程式碼吧~!提示流程就是先還票據,然後喚醒。Semaphore差不多就這些知識點,我也帶著大家簡單的看了一遍原始碼。我們再來繼續看一下後面AQS的一些工具類。

CountDownLaunch的基本使用

  CountDownLaunch很好理解,也是比較實用的,我們幹王者農藥的時候就是一個很好的栗子,遊戲選完人物大家一起載入地圖等遊戲資料,有的人慢,有的人快,這時就印出來了CountDownLaunch,相當於我們5個玩家同時開啟5個執行緒,然後一起執行,執行完畢先等著,直到5個玩家全部執行完成時,才可以執行後續操作。我們來看一下程式碼。

public class CountDownLaunchSample {

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        new Thread(new playerOne(countDownLatch)).start();
        new Thread(new playerTwo(countDownLatch)).start();
        countDownLatch.await();
        System.out.println("全部載入完成");
    }

    static class playerOne implements Runnable {
        CountDownLatch countDownLatch;

        public playerOne(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public void run() {
            try {
                System.out.println("玩家1開始載入...");
                Thread.sleep(2000);
                System.out.println("玩家1載入完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (countDownLatch != null)
                    countDownLatch.countDown();
            }
        }

    }


    static class playerTwo implements Runnable {
        CountDownLatch countDownLatch;

        public playerTwo(CountDownLatch countDownLatch) {
            this.countDownLatch = countDownLatch;
        }

        public void run() {
            try {
                System.out.println("玩家2開始載入...");
                Thread.sleep(10000);
                System.out.println("玩家2載入完成");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                if (countDownLatch != null)
                    countDownLatch.countDown();
            }
        }

    }
}

   實際專案中如果遇到讀取excel多個sheet頁籤然後彙總資料的情況也可以採用CountDownLanch。注意最後final的countDownLatch.countDown()方法,也是一個類似上面票據增減的方法。

CyclicBarrier柵欄的簡單使用:

  CyclicBarrier和我們上面的CountDownLanch差不多,都是開啟多個任務一起去執行,不同的是CountDownLanch需要支線任務執行完成然後CountDownLanch做一個彙總,然後繼續執行後續程式。CyclicBarrier不需要做彙總。再就是CyclicBarrier是可以重複的。

public class CyclicBarrierTest implements Runnable {
    private CyclicBarrier cyclicBarrier;
    private int index ;

    public CyclicBarrierTest(CyclicBarrier cyclicBarrier, int index) {
        this.cyclicBarrier = cyclicBarrier;
        this.index = index;
    }

    public void run() {
        try {
            System.out.println("index: " + index);
            index--;
            cyclicBarrier.await();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(11, new Runnable() {
            public void run() {
                System.out.println("所有特工到達屏障,準備開始執行祕密任務");
            }
        });
        for (int i = 0; i < 10; i++) {
            new Thread(new CyclicBarrierTest(cyclicBarrier, i)).start();
        }
        cyclicBarrier.await();
        System.out.println("全部到達屏障....");
    }

}

  這個需要注意的是CyclicBarrier cyclicBarrier = new CyclicBarrier(11, 這個11,就是說一定有11個執行緒執行完畢,我才可以執行後面的操作,我們下面for迴圈是10,而我們那裡寫的是11啊,別忘記還有一個主執行緒呢,所以說每次計算一定加一個主執行緒啊。

Exchanger的簡單使用

  最後就是我們Exchanger,平時使用的不多,我們瞭解一下就可以了,摟一眼程式碼,就是執行緒之間的變數交換。

public static void main(String []args) {
    final Exchanger<Integer> exchanger = new Exchanger<Integer>();
    for(int i = 0 ; i < 4 ; i++) {
        final Integer num = i;
        new Thread() {
            public void run() {
                System.out.println("我是執行緒:Thread_" + this.getName() + "我的資料是:" + num);
                try {
                    Integer exchangeNum = exchanger.exchange(num);
                    Thread.sleep(1000);
                    System.out.println("我是執行緒:Thread_" + this.getName() + "我原先的資料為:" + num + " , 交換後的資料為:" + exchangeNum);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }.start();
    }
}

總結:

  這次我們核心梳理了我們的Semaphore的執行流程,內部是如何來實現我們的票據計數,獲取,歸還等操作的,再就是我們for無限迴圈會在兩次以後自動阻塞的設計思想,還有我們的CountDownLanch、CyclicBarrier、Executors的基本使用,並賦予大家簡單的程式碼流程,今天就說到這,明天我們繼續來說我們的多執行緒。