1. 程式人生 > >J.U.C - AQS

J.U.C - AQS

java.util.concurrent(J.U.C)大大提高了併發效能,AQS 被認為是 J.U.C 的核心。

CountdownLatch

用來控制一個執行緒等待多個執行緒。

維護了一個計數器 cnt,每次呼叫 countDown() 方法會讓計數器的值減 1,減到 0 的時候,那些因為呼叫 await() 方法而在等待的執行緒就會被喚醒。

public class CountdownLatchExample {

    public static void main(String[] args) throws InterruptedException {
        
final int totalThread = 10; CountDownLatch countDownLatch = new CountDownLatch(totalThread); ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < totalThread; i++) { executorService.execute(() -> { System.out.print(
"run.."); countDownLatch.countDown(); }); } countDownLatch.await(); System.out.println("end"); executorService.shutdown(); } } run..run..run..run..run..run..run..run..run..run..end

CyclicBarrier

用來控制多個執行緒互相等待,只有當多個執行緒都到達時,這些執行緒才會繼續執行。

和 CountdownLatch 相似,都是通過維護計數器來實現的。執行緒執行 await() 方法之後計數器會減 1,並進行等待,直到計數器為 0,所有呼叫 await() 方法而在等待的執行緒才能繼續執行。

CyclicBarrier 和 CountdownLatch 的一個區別是,CyclicBarrier 的計數器通過呼叫 reset() 方法可以迴圈使用,所以它才叫做迴圈屏障。

CyclicBarrier 有兩個建構函式,其中 parties 指示計數器的初始值,barrierAction 在所有執行緒都到達屏障的時候會執行一次。

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0) throw new IllegalArgumentException();
    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

public CyclicBarrier(int parties) {
    this(parties, null);
}

 

public class CyclicBarrierExample {

    public static void main(String[] args) {
        final int totalThread = 10;
        CyclicBarrier cyclicBarrier = new CyclicBarrier(totalThread);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalThread; i++) {
            executorService.execute(() -> {
                System.out.print("before..");
                try {
                    cyclicBarrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
                System.out.print("after..");
            });
        }
        executorService.shutdown();
    }
}
before..before..before..before..before..before..before..before..before..before..after..after..after..after..after..after..after..after..after..after..

Semaphore

Semaphore 類似於作業系統中的訊號量,可以控制對互斥資源的訪問執行緒數。

 

以下程式碼模擬了對某個服務的併發請求,每次只能有 3 個客戶端同時訪問,請求總數為 10。

public class SemaphoreExample {

    public static void main(String[] args) {
        final int clientCount = 3;
        final int totalRequestCount = 10;
        Semaphore semaphore = new Semaphore(clientCount);
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < totalRequestCount; i++) {
            executorService.execute(()->{
                try {
                    semaphore.acquire();
                    System.out.print(semaphore.availablePermits() + " ");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    semaphore.release();
                }
            });
        }
        executorService.shutdown();
    }
}
2 1 2 2 2 2 2 1 2 2

FutureTask

在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 介面,該介面繼承自 Runnable 和 Future 介面,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。

public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>

FutureTask 可用於非同步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那麼就可以用 FutureTask 來封裝這個任務,主執行緒在完成自己的任務之後再去獲取結果。

public class FutureTaskExample {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() {
            @Override
            public Integer call() throws Exception {
                int result = 0;
                for (int i = 0; i < 100; i++) {
                    Thread.sleep(10);
                    result += i;
                }
                return result;
            }
        });

        Thread computeThread = new Thread(futureTask);
        computeThread.start();

        Thread otherThread = new Thread(() -> {
            System.out.println("other task is running...");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        otherThread.start();
        System.out.println(futureTask.get());
    }
}
other task is running...
4950

BlockingQueue

java.util.concurrent.BlockingQueue 介面有以下阻塞佇列的實現:

  • FIFO 佇列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
  • 優先順序佇列 :PriorityBlockingQueue

提供了阻塞的 take() 和 put() 方法:如果佇列為空 take() 將阻塞,直到佇列中有內容;如果佇列為滿 put() 將阻塞,直到佇列有空閒位置。

使用 BlockingQueue 實現生產者消費者問題

public class ProducerConsumer {

    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);

    private static class Producer extends Thread {
        @Override
        public void run() {
            try {
                queue.put("product");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("produce..");
        }
    }

    private static class Consumer extends Thread {

        @Override
        public void run() {
            try {
                String product = queue.take();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.print("consume..");
        }
    }
}
public static void main(String[] args) {
    for (int i = 0; i < 2; i++) {
        Producer producer = new Producer();
        producer.start();
    }
    for (int i = 0; i < 5; i++) {
        Consumer consumer = new Consumer();
        consumer.start();
    }
    for (int i = 0; i < 3; i++) {
        Producer producer = new Producer();
        producer.start();
    }
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..

ForkJoin

主要用於平行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務平行計算。

public class ForkJoinExample extends RecursiveTask<Integer> {

    private final int threshold = 5;
    private int first;
    private int last;

    public ForkJoinExample(int first, int last) {
        this.first = first;
        this.last = last;
    }

    @Override
    protected Integer compute() {
        int result = 0;
        if (last - first <= threshold) {
            // 任務足夠小則直接計算
            for (int i = first; i <= last; i++) {
                result += i;
            }
        } else {
            // 拆分成小任務
            int middle = first + (last - first) / 2;
            ForkJoinExample leftTask = new ForkJoinExample(first, middle);
            ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
            leftTask.fork();
            rightTask.fork();
            result = leftTask.join() + rightTask.join();
        }
        return result;
    }
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
    ForkJoinExample example = new ForkJoinExample(1, 10000);
    ForkJoinPool forkJoinPool = new ForkJoinPool();
    Future result = forkJoinPool.submit(example);
    System.out.println(result.get());
}

ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的執行緒池,執行緒數量取決於 CPU 核數。

public class ForkJoinPool extends AbstractExecutorService

ForkJoinPool 實現了工作竊取演算法來提高 CPU 的利用率。每個執行緒都維護了一個雙端佇列,用來儲存需要執行的任務。工作竊取演算法允許空閒的執行緒從其它執行緒的雙端佇列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和佇列所屬執行緒發生競爭。例如下圖中,Thread2 從 Thread1 的佇列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。但是如果佇列中只有一個任務時還是會發生競爭。