1. 程式人生 > >Java併發包之閉鎖/柵欄/訊號量

Java併發包之閉鎖/柵欄/訊號量

一、Java多執行緒總結:
  1. 描述執行緒的類:Runable和Thread都屬於java.lang包。
  2. 內建鎖synchronized屬於jvm關鍵字,內建條件佇列操作介面Object.wait()/notify()/notifyAll()屬於java.lang包。
  3. 提供記憶體可見性和防止指令重排的volatile屬於jvm關鍵字。
  4. 而java.util.concurrent包(J.U.C)中包含的是java併發程式設計中有用的一些工具類,包括幾個部分:
    • locks部分:包含在java.util.concurrent.locks包中,提供顯式鎖(互斥鎖和速寫鎖)相關功能。
    • atomic部分:包含在java.util.concurrent.atomic包中,提供原子變數類相關的功能,是構建非阻塞演算法的基礎。
    • executor部分:散落在java.util.concurrent包中,提供執行緒池相關的功能。
    • collections部分:散落在java.util.concurrent包中,提供併發容器相關功能。
    • tools部分:散落在java.util.concurrent包中,提供同步工具類,如訊號量、閉鎖、柵欄等功能。
      這裡寫圖片描述
二、同步工具類詳解

1、Semaphore訊號量:跟鎖機制存在一定的相似性,semaphore也是一種鎖機制,所不同的是,reentrantLock是隻允許一個執行緒獲得鎖,而訊號量持有多個許可(permits),允許多個執行緒獲得許可並執行。可以用來控制同時訪問某個特定資源的運算元量,或者同時執行某個指定操作的數量。

示例程式碼

 5 public class TIJ_semaphore {
 6     public static void main(String[] args) {
 7         ExecutorService exec = Executors.newCachedThreadPool();
 8         final Semaphore semp = new Semaphore(5); // 5 permits
 9 
10         for (int index = 0; index < 20; index++) {
11             final int
NO = index; 12 Runnable run = new Runnable() { 13 public void run() { 14 try { // if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable 15 semp.acquire(); 16 System.out.println("Accessing: " + NO); 17 Thread.sleep((long) (10000); 18 semp.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }; 23 exec.execute(run); 24 } 25 exec.shutdown(); 26 }

2、CountDownLatch閉鎖:允許一個或多個執行緒一直等待,直到其他執行緒的操作執行完後再執行。CountDownLatch是通過一個計數器來實現的,計數器的初始值為執行緒的數量。每當一個執行緒完成了自己的任務後,計數器的值就會減1。當計數器值到達0時,它表示所有的執行緒已經完成了任務,然後在閉鎖上等待的執行緒就可以恢復執行任務。

主要方法
1. CountDownLatch.await():將某個執行緒阻塞住,直到計數器count=0才恢復執行。
2. CountDownLatch.countDown():將計數器count減1。

使用場景
1. 實現最大的並行性:有時我們想同時啟動多個執行緒,實現最大程度的並行性。例如,我們想測試一個單例類。如果我們建立一個初始計數為1的CountDownLatch,並讓所有執行緒都在這個鎖上等待,那麼我們可以很輕鬆地完成測試。我們只需呼叫 一次countDown()方法就可以讓所有的等待執行緒同時恢復執行。
2. 開始執行前等待n個執行緒完成各自任務:例如應用程式啟動類要確保在處理使用者請求前,所有N個外部系統已經啟動和運行了。
3. 死鎖檢測:一個非常方便的使用場景是,你可以使用n個執行緒訪問共享資源,在每次測試階段的執行緒數目是不同的,並嘗試產生死鎖。
4. 計算併發執行某個任務的耗時。

示例程式碼

public class CountDownLatchTest {  

    public void timeTasks(int nThreads, final Runnable task) throws InterruptedException{  
        final CountDownLatch startGate = new CountDownLatch(1);  
        final CountDownLatch endGate = new CountDownLatch(nThreads);  

        for(int i = 0; i < nThreads; i++){  
            Thread t = new Thread(){  
                public void run(){  
                    try{  
                        startGate.await();  
                        try{  
                            task.run();  
                        }finally{  
                            endGate.countDown();  
                        }  
                    }catch(InterruptedException ignored){  

                    }  

                }  
            };  
            t.start();  
        }  

        long start = System.nanoTime();  
        System.out.println("開啟閉鎖");  
        startGate.countDown();  
        endGate.await();  
        long end = System.nanoTime();  
        System.out.println("閉鎖退出,共耗時" + (end-start));  
    }  

    public static void main(String[] args) throws InterruptedException{  
        CountDownLatchTest test = new CountDownLatchTest();  
        test.timeTasks(5, test.new RunnableTask());  
    }  

    class RunnableTask implements Runnable{  

        @Override  
        public void run() {  
            System.out.println("當前執行緒為:" + Thread.currentThread().getName());  

        }     
    }  

執行結果為:  
開啟閉鎖  
當前執行緒為:Thread-0  
當前執行緒為:Thread-3  
當前執行緒為:Thread-2  
當前執行緒為:Thread-4  
當前執行緒為:Thread-1  
閉鎖退出,共耗時1109195  

3、CyclicBarrier柵欄:用於阻塞一組執行緒直到某個事件發生。所有執行緒必須同時到達柵欄位置才能繼續執行下一步操作,且能夠被重置以達到重複利用。而閉鎖是一次性物件,一旦進入終止狀態,就不能被重置。

示例程式碼

public class CyclicBarrierTest {
    private final CyclicBarrier barrier;
    private final Worker[] workers;

    public CyclicBarrierTest(){
        int count = Runtime.getRuntime().availableProcessors();
        this.barrier = new CyclicBarrier(count,
                new Runnable(){

                    @Override
                    public void run() {
                        System.out.println("所有執行緒均到達柵欄位置,開始下一輪計算");
                    }

        });
        this.workers = new Worker[count];
        for(int i = 0; i< count;i++){
            workers[i] = new Worker(i);
        }
    }
    private class Worker implements Runnable{
        int i;

        public Worker(int i){
            this.i = i;
        }

        @Override
        public void run() {
            for(int index = 1; index < 3;index++){
                System.out.println("執行緒" + i + "第" + index + "次到達柵欄位置,等待其他執行緒到達");
                try {
                    //注意是await,而不是wait
                    barrier.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                    return;
                } catch (BrokenBarrierException e) {
                    e.printStackTrace();
                    return;
                }
            }
        }

    }

    public void start(){
        for(int i=0;i<workers.length;i++){
            new Thread(workers[i]).start();
        }
    }

    public static void main(String[] args){
        new CyclicBarrierTest().start();
    }
}

執行結果為:  
執行緒01次到達柵欄位置,等待其他執行緒到達  
執行緒11次到達柵欄位置,等待其他執行緒到達  
執行緒21次到達柵欄位置,等待其他執行緒到達  
執行緒31次到達柵欄位置,等待其他執行緒到達  
所有執行緒均到達柵欄位置,開始下一輪計算  
執行緒32次到達柵欄位置,等待其他執行緒到達  
執行緒22次到達柵欄位置,等待其他執行緒到達  
執行緒02次到達柵欄位置,等待其他執行緒到達  
執行緒12次到達柵欄位置,等待其他執行緒到達  
所有執行緒均到達柵欄位置,開始下一輪計算