多執行緒:瞭解一下ForkJoin、FutureTask、BlockingQueue
先了解一下這三種類
FutureTask
在介紹 Callable 時我們知道它可以有返回值,返回值通過 Future 進行封裝。FutureTask 實現了 RunnableFuture 介面,該介面繼承自 Runnable 和 Future 介面,這使得 FutureTask 既可以當做一個任務執行,也可以有返回值。
public class FutureTask<V> implements RunnableFuture<V>
public interface RunnableFuture<V> extends Runnable, Future<V>
FutureTask 可用於非同步獲取執行結果或取消執行任務的場景。當一個計算任務需要執行很長時間,那麼就可以用 FutureTask 來封裝這個任務,主執行緒在完成自己的任務之後再去獲取結果。get()方法會阻塞當前主執行緒,直到call()結束並返回。
public class FutureTaskExample { public static void main(String[] args) throws ExecutionException, InterruptedException { FutureTask<Integer> futureTask = new FutureTask<Integer>(new Callable<Integer>() { @Override public Integer call() throws Exception { int result = 0; for (int i = 0; i < 100; i++) { Thread.sleep(10); result += i; } return result; } }); Thread computeThread = new Thread(futureTask); computeThread.start(); Thread otherThread = new Thread(() -> { System.out.println("other task is running..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } }); otherThread.start(); System.out.println(futureTask.get()); } }
other task is running...
4950
BlockingQueue
java.util.concurrent.BlockingQueue 介面有以下阻塞佇列的實現:
- FIFO 佇列 :LinkedBlockingQueue、ArrayBlockingQueue(固定長度)
- 優先順序佇列 :PriorityBlockingQueue
提供了阻塞的 take() 和 put() 方法:如果佇列為空 take() 將阻塞,直到佇列中有內容;如果佇列為滿 put() 將阻塞,直到佇列有空閒位置。
使用 BlockingQueue 實現生產者消費者問題
public class ProducerConsumer {
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(5);
private static class Producer extends Thread {
@Override
public void run() {
try {
queue.put("product");
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("produce..");
}
}
private static class Consumer extends Thread {
@Override
public void run() {
try {
String product = queue.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.print("consume..");
}
}
}
public static void main(String[] args) {
for (int i = 0; i < 2; i++) {
Producer producer = new Producer();
producer.start();
}
for (int i = 0; i < 5; i++) {
Consumer consumer = new Consumer();
consumer.start();
}
for (int i = 0; i < 3; i++) {
Producer producer = new Producer();
producer.start();
}
}
produce..produce..consume..consume..produce..consume..produce..consume..produce..consume..
ForkJoin
主要用於平行計算中,和 MapReduce 原理類似,都是把大的計算任務拆分成多個小任務平行計算。
public class ForkJoinExample extends RecursiveTask<Integer> {
private final int threshold = 5;
private int first;
private int last;
public ForkJoinExample(int first, int last) {
this.first = first;
this.last = last;
}
@Override
protected Integer compute() {
int result = 0;
if (last - first <= threshold) {
// 任務足夠小則直接計算
for (int i = first; i <= last; i++) {
result += i;
}
} else {
// 拆分成小任務
int middle = first + (last - first) / 2;
ForkJoinExample leftTask = new ForkJoinExample(first, middle);
ForkJoinExample rightTask = new ForkJoinExample(middle + 1, last);
leftTask.fork();
rightTask.fork();
result = leftTask.join() + rightTask.join();
}
return result;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
ForkJoinExample example = new ForkJoinExample(1, 10000);
ForkJoinPool forkJoinPool = new ForkJoinPool();
Future result = forkJoinPool.submit(example);
System.out.println(result.get());
}
ForkJoin 使用 ForkJoinPool 來啟動,它是一個特殊的執行緒池,執行緒數量取決於 CPU 核數。
public class ForkJoinPool extends AbstractExecutorService
ForkJoinPool 實現了工作竊取演算法來提高 CPU 的利用率。每個執行緒都維護了一個雙端佇列,用來儲存需要執行的任務。工作竊取演算法允許空閒的執行緒從其它執行緒的雙端佇列中竊取一個任務來執行。竊取的任務必須是最晚的任務,避免和佇列所屬執行緒發生競爭。例如下圖中,Thread2 從 Thread1 的佇列中拿出最晚的 Task1 任務,Thread1 會拿出 Task2 來執行,這樣就避免發生競爭。但是如果佇列中只有一個任務時還是會發生競爭。