1. 程式人生 > >常用的併發工具類

常用的併發工具類

在java 1.5中,提供了一些非常有用的輔助類來幫助我們進行併發程式設計,比如CountDownLatch,CyclicBarrier、Semaphore和Exchange

CountDownLatch

CountDownLatch是一個同步計數器,初始化的時候 傳入需要計數的執行緒等待數,可以是需要等待執行完成的執行緒數,或者大於。
作用:用來協調多個執行緒之間的同步,或者說起到執行緒之間的通訊(而不是用作互斥的作用)。是一組執行緒等待其他的執行緒完成工作以後在執行,相當於加強版join,其中:

await():阻塞當前執行緒,等待其他執行緒執行完成,直到計數器計數值減到0。
countDown():負責計數器的減一。

例如:主執行緒等待其他六個執行緒執行完成,再執行主執行緒。

/**
 * CountDownLatch計數器演示,由外部執行緒控制一組執行緒的放行
 */
public class CountDownLatchDemo {
    
    private static CountDownLatch countDownLatch = new CountDownLatch(6);

    private static class InitThread extends Thread {

        @Override
        public void run() {
            System.out.println("執行緒" + Thread.currentThread().getName() + "開始初始化....");
            countDownLatch.countDown();
        }
    }

    private static class BusiThread extends Thread {

        @Override
        public void run() {
            try {
                System.out.println("執行緒" + Thread.currentThread().getName() + "準備執行....");

                //只有等countDownLatch的計數器為0,也就是其他計數器的執行緒都執行完了,才能執行
                countDownLatch.await();
                SleepTools.second(1);
                System.out.println("執行緒" + Thread.currentThread().getName() + "執行完成....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        new Thread(new Runnable() {
            @Override
            public void run() {
                System.out.println("執行緒" + Thread.currentThread().getName() + "開始初始化....");
                countDownLatch.countDown();
            }
        }, "main-2").start();

        new BusiThread().start();

        for (int i = 0; i < 5; i++) {
            new InitThread().start();
        }
        try {
            countDownLatch.await();

            System.out.println("主執行緒執行結束.......");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

執行結果:
在這裡插入圖片描述

CyclicBarrier

CyclicBarrier字面意思是柵欄,是多執行緒中一個重要的類,主要用於執行緒組內部之間的執行緒的相互等待問題,初始化的時候傳入需要等待的執行緒數。
作用:讓一組執行緒達到某個屏障被阻塞,一直到組內最後一個執行緒達到屏障時,屏障開放,所有被阻塞的執行緒才會繼續執行。其中:

CyclicBarrier(int parties):初始化定義需要等待的執行緒數parties。
CyclicBarrier(int parties, Runnable barrierAction):當屏障開放的時候,執行緒barrierAction的任務會執行。

CountDownLatch和CyclicBarrier的區別:

1、countdownlatch放行由第三者控制,CyclicBarrier放行由一組執行緒本身控制
2、countdownlatch放行條件》=執行緒數,CyclicBarrier放行條件=執行緒數
3、CountDownLatch會阻塞主執行緒,CyclicBarrier不會阻塞主執行緒,只會阻塞子執行緒。
4、CountDownLatch的計數器只能使用一次。而CyclicBarrier的計數器可以使用reset()方
法重置。所以CyclicBarrier能處理更為複雜的業務場景,比如如果計算髮生錯誤,可以重置計數器,並讓執行緒們重新執行一次。

例如:主執行緒等待5個子執行緒執行完成

/**
 * CyclicBarrier柵欄演示,由一組執行緒本身控制放行
 */
public class CyclicBarrierDemo {

    //定義一個柵欄,在柵欄放行的時候,同時可以執行CollectThread這個執行緒
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new CollectThread());

    private static ConcurrentHashMap<Long, String> concurrentHashMap = new ConcurrentHashMap();
    
    private static class CollectThread extends Thread {

        @Override
        public void run() {
            System.out.println(concurrentHashMap.size());
        }
    }
    
    private static class SubThread extends Thread {
        @Override
        public void run() {
            try {
                Random random = new Random();
                long id = Thread.currentThread().getId();
                String name = Thread.currentThread().getName();
                concurrentHashMap.put(id, name);
                if (random.nextBoolean()) {
                    System.out.println("執行緒" + name + "正在處理中....");
                    SleepTools.second(2);
                }
                System.out.println("執行緒" + name + "處理結束,等待其他執行緒處理結束....");

                //每個子執行緒都會在這裡阻塞,等待所有的子執行緒都執行到這裡,才會一起放行
                cyclicBarrier.await();
                System.out.println("執行緒" + name + "處理完畢....");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (BrokenBarrierException e) {
                e.printStackTrace();
            }
        }
    }
    
    public static void main(String[] args) {
        for (int i = 0; i < 5; i++) {
            new SubThread().start();
        }

        //CyclicBarrier,不會阻塞主執行緒
        System.out.println("主執行緒.....");
    }
}

執行結果:
在這裡插入圖片描述

Semaphore

Semaphore又名訊號量,是作業系統中的一個概念,在Java併發程式設計中,訊號量控制的是執行緒併發的數量。
作用:Semaphore管理一系列許可證。每個acquire方法阻塞,直到有一個許可證可以獲得然後拿走一個許可證;每個release方法增加一個許可證,這可能會釋放一個阻塞的acquire方法。然而,其實並沒有實際的許可證這個物件,Semaphore只是維持了一個可獲得許可證的數量,主要控制同時訪問某個特定資源的執行緒數量,多用在流量控制。
注意:其他Semaphore的底層實現就是基於AQS的共享鎖實現的。

如果一個執行緒要訪問共享資源,必須先獲得訊號量,如果訊號量的計數器值大於1,意味
著有共享資源可以訪問,則使其計數器值減去1,再訪問共享資源。如果計數器值為0,線
程進入休眠。當某個執行緒使用完共享資源後,釋放訊號量,並將訊號量內部的計數器加1,之前進入休眠的執行緒將被喚醒並再次試圖獲得訊號量。

例如:使用Semaphore模擬資料庫連線池,Semaphore訊號量是可以控制多個執行緒訪問同一資源。

/**
 * 模擬資料庫連線實現
 */
public class ConnectionImpl implements Connection {

    public static Connection getConnection() {
        return new ConnectionImpl();
    }

    @Override
    public Statement createStatement() throws SQLException {
        //模擬操作
        SleepTools.ms(10);
        return null;
    }
    //剩下的重寫的方法就省略了........
 }

這個類是模擬資料庫連線操作,其中重寫了一個方法,休眠10毫秒,用於模擬實際操作耗時。

/**
 * 用Semaphore訊號量,模擬資料庫連線池,Semaphore訊號量是可以控制多個執行緒訪問同一資源
 */
public class SemaphoreDemo {

    //執行緒池
    private static LinkedList<Connection> list = new LinkedList<>();

    //執行緒池的大小
    private static final int POOL_SIZE = 10;

    //useful:可以使用的資料庫連線數   userless:不能使用的資料庫連線數
    private static Semaphore useful, userless;

    public SemaphoreDemo() {
        useful = new Semaphore(POOL_SIZE);
        userless = new Semaphore(0);
    }

    //初始化執行緒池
    static {
        for (int i = 0; i < POOL_SIZE; i++) {
            list.add(new ConnectionImpl());
        }
    }


    //獲取資料庫連線
    public static Connection getConnection() {
        Connection connection = null;
        try {
            //可用的訊號獲取許可證,阻塞方法,只有當前執行緒獲取到許可證才能放行
            useful.acquire();

            synchronized (list) {
                //取出連線池中的第一個
                connection = list.removeFirst();
            }
            //不可用的訊號釋放許可證,將其返回到訊號量
            userless.release();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return connection;
    }

    //釋放連線
    public static void releaseConnection(Connection connection) {
        if (connection != null) {
            System.out.println("當前有" + useful.getQueueLength() + "個執行緒等待資料庫連線!!"
                    + "可用連線數:" + useful.availablePermits());
            try {
                //不可用的訊號獲取許可證
                userless.acquire();
                //放回池
                synchronized (list) {
                    list.addLast(connection);
                }
                //可用的訊號釋放許可證
                useful.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

測試:

public class TestSemphore {

    private static SemaphoreDemo semaphoreDemo = new SemaphoreDemo();

    private static class BusiThread extends Thread {
        @Override
        public void run() {
            Random r = new Random();//讓每個執行緒持有連線的時間不一樣
            long start = System.currentTimeMillis();
            Connection connection = semaphoreDemo.getConnection();
            System.out.println("Thread_" + Thread.currentThread().getId()
                    + "_獲取資料庫連線共耗時:" + (System.currentTimeMillis() - start) + "ms.....");
            //模擬業務操作,執行緒持有連線查詢資料
            SleepTools.ms(100 + r.nextInt(100));
            System.out.println("查詢資料完成,歸還連線!");
            semaphoreDemo.releaseConnection(connection);
        }
    }


    public static void main(String[] args) {

        for (int i = 0; i < 50; i++) {
            new BusiThread().start();
        }
    }
}

執行結果:
在這裡插入圖片描述

Exchange

Exchange類似於一個交換器,可以在對中對元素進行配對和交換的執行緒的同步點,用於兩個執行緒間的資料交換。
具體來說,Exchanger類允許在兩個執行緒之間定義同步點。當兩個執行緒都到達同步點時,他們交換資料結構,因此第一個執行緒的資料結構進入到第二個執行緒中,第二個執行緒的資料結構進入到第一個執行緒中。
例如:交換兩個執行緒各自的資料。

public class ExchangerDemo {

    private static Exchanger<Set<String>> exchanger = new Exchanger<>();

    private static class ExchangerClassO extends Thread {
        private Set<String> set;

        public ExchangerClassO(Set<String> set) {
            this.set = set;
        }

        @Override
        public void run() {
            try {
                exchange(set);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }


    private static class ExchangerClassT extends Thread {
        private Set<String> set;

        public ExchangerClassT(Set<String> set) {
            this.set = set;
        }

        @Override
        public void run() {
            try {
                exchange(set);

            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    private static void exchange(Set<String> set) throws InterruptedException {
        //交換資料,阻塞
        System.out.println("執行緒:" + Thread.currentThread().getName() + "交換前得值....");
        for (String s : set) {
            System.out.println(s);
        }
        exchanger.exchange(set);
        System.out.println("執行緒:" + Thread.currentThread().getName() + "交換後得值....");
        for (String s : set) {
            System.out.println(s);
        }
    }


    public static void main(String[] args) {
        Set<String> setA = new HashSet<>();
        Set<String> setB = new HashSet<>();
        setA.add("a1");
        setA.add("b1");
        setA.add("c1");

        setB.add("a2");
        setB.add("b2");
    setB.add("c2");

    new ExchangerClassO(setA).start();
    new ExchangerClassT(setB).start();
}

}

執行結果:
在這裡插入圖片描述