1. 程式人生 > >JAVA學習筆記(併發程式設計 - 柒)- J.U.C元件2

JAVA學習筆記(併發程式設計 - 柒)- J.U.C元件2

J.U.C-FutureTask

在Java中一般通過繼承Thread類或者實現Runnable介面這兩種方式來建立執行緒,但是這兩種方式都有個缺陷,就是不能在執行完成後獲取執行的結果,因此Java 1.5之後提供了Callable和Future介面,通過它們就可以在任務執行完畢之後得到任務的執行結果。

而FutureTask則是J.U.C中的類,但不是AQS的子類,FutureTask是一個可刪除的非同步計算類。這個類提供了Future介面的的基本實現,使用相關方法啟動和取消計算,查詢計算是否完成,並檢索計算結果。只有在計算完成時才能使用get方法檢索結果;如果計算尚未完成,get方法將會阻塞。一旦計算完成,計算就不能重新啟動或取消(除非使用runAndReset方法呼叫計算)。

Runnable與Callable以及Future介面對比:

Runnable是一個介面,在它裡面只聲明瞭一個run()方法。由於run()方法返回值為void型別,所以在執行完任務之後無法返回任何結果:

public interface Runnable {
    public abstract void run();
}

Callable介面也只聲明瞭一個方法,這個方法叫做call()。Callable介面定義如下:

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
V call() throws Exception; }

可以看到Callable是個泛型介面,泛型V就是要call()方法返回的型別。Callable介面和Runnable介面很像,都可以被另外一個執行緒執行,但是正如前面所說的,Runnable不會返回資料也不能丟擲異常。

Future也是一個介面,Future介面代表非同步計算的結果,通過Future介面提供的方法可以檢視非同步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還可以取消執行。說白了Future就是對於具體的Runnable或者Callable任務的執行結果進行取消、查詢是否完成以及獲取執行結果。其中執行結果通過get方法獲取,該方法會阻塞直到任務返回結果。Future介面的定義如下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

在Future介面中聲明瞭5個方法,下面依次解釋每個方法的作用:

cancel()方法用來取消非同步任務的執行。如果非同步任務已經完成或者已經被取消,或者由於某些原因不能取消,則會返回false。如果任務還沒有被執行,則會返回true並且非同步任務不會被執行。如果任務已經開始執行了但是還沒有執行完成,若mayInterruptIfRunning為true,則會立即中斷執行任務的執行緒並返回true,若mayInterruptIfRunning為false,則會返回true且不會中斷任務執行執行緒。
isCanceled()方法用於判斷任務是否被取消,如果任務在結束(正常執行結束或者執行異常結束)前被取消則返回true,否則返回false。
isDone()方法用於判斷任務是否已經完成,如果完成則返回true,否則返回false。需要注意的是:任務執行過程中發生異常、任務被取消也屬於任務已完成,也會返回true。
get()方法用於獲取任務執行結果,如果任務還沒完成則會阻塞等待直到任務執行完成。如果任務被取消則會丟擲CancellationException異常,如果任務執行過程發生異常則會丟擲ExecutionException異常,如果阻塞等待過程中被中斷則會丟擲InterruptedException異常。
get(long timeout,Timeunit unit)是帶超時時間的get()版本,如果阻塞等待過程中超時則會丟擲TimeoutException異常。

綜上,Future主要提供了三種功能:

  1. 判斷任務是否完成;
  2. 能夠中斷任務;
  3. 能夠獲取任務執行結果。

因為Future只是一個介面,所以是無法直接用來建立物件使用的,因此就有了下面的FutureTask。FutureTask的父類是RunnableFuture,而RunnableFuture則繼承了Runnable和Future這兩個介面。所以由此可知,FutureTask最終也屬於是Callable型別的任務。如果往FutureTask的建構函式傳入Runnable的話,也會被轉換成Callable型別。

FutureTask繼承圖如下:
在這裡插入圖片描述
可以看到,FutureTask實現了RunnableFuture介面,則RunnableFuture介面繼承了Runnable介面和Future介面,所以FutureTask既能當做一個Runnable直接被Thread執行,也能作為Future用來得到Callable的計算結果。

使用場景:

假設有一個很費時的邏輯需要計算,並且需要返回計算的結果,但這個結果又不是馬上需要的。那麼這時就可以使用FutureTask,用另外一個執行緒去進行計算,而當前執行緒在得到這個計算結果之前,就可以去執行其他的操作,等到需要這個結果時再通過Future得到即可。

FutureTask有兩個構造器,支援傳入Callable和Runnable型別,在使用 Runnable 時,需要多指定一個返回結果型別:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

使用示例

1.Future基本使用示例:

@Slf4j
public class FutureExample {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newCachedThreadPool();
        // 使用lambda建立callable任務,使用Future接收任務執行的結果
        Future<String> future = executorService.submit(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);

            return "Done";
        });

        log.info("do something in main");
        Thread.sleep(1000);
        // 獲取執行結果
        String result = future.get();
        log.info("result: {}", result);
        executorService.shutdown();
    }
}

2.FutureTask基本使用示例:

@Slf4j
public class FutureTaskExample {

    public static void main(String[] args) throws Exception {
        // 構建FutureTask例項,使用lambda建立callable任務
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            log.info("do something in callable");
            Thread.sleep(5000);

            return "Done";
        });

        ExecutorService executorService = Executors.newCachedThreadPool();
        executorService.execute(futureTask);

        log.info("do something in main");
        Thread.sleep(1000);
        // 獲取執行結果
        String result = futureTask.get();
        log.info("result: {}", result);
        executorService.shutdown();
    }
}

從以上兩個示例可以看到,Future和FutureTask的使用方式是很相似的,畢竟FutureTask就是Future的一個實現。

J.U.C-ForkJoin

Fork/Join框架是Java7提供了的一個用於並行執行任務的框架, 是一個把大任務分割成若干個小任務,最終彙總每個小任務結果後得到大任務結果的框架,其思想和map-reduce非常類似。

我們再通過Fork和Join這兩個單詞來理解下Fork/Join框架,Fork就是把一個大任務切分為若干子任務並行的執行,Join就是合併這些子任務的執行結果,最後得到這個大任務的結果。比如計算1+2+。。+10000,可以分割成10個子任務,每個子任務分別對1000個數進行求和,最終彙總這10個子任務的結果。Fork/Join的執行流程圖如下:
在這裡插入圖片描述

工作竊取演算法:

Fork/Join框架主要採用的是工作竊取(work-stealing)演算法,該演算法是指某個執行緒從其他佇列裡竊取任務來執行。工作竊取的執行流程圖如下:
在這裡插入圖片描述
那麼為什麼需要使用工作竊取演算法呢?假如我們需要做一個比較大的任務,我們可以把這個任務分割為若干互不依賴的子任務,為了減少執行緒間的競爭,於是把這些子任務分別放到不同的佇列裡,併為每個佇列建立一個單獨的執行緒來執行佇列裡的任務,執行緒和佇列一一對應,比如A執行緒負責處理A佇列裡的任務。但是有的執行緒會先把自己佇列裡的任務幹完,而其他執行緒對應的佇列裡還有任務等待處理。幹完活的執行緒與其等著,不如去幫其他執行緒幹活,於是它就去其他執行緒的佇列裡竊取一個任務來執行。而在這時它們會訪問同一個佇列,所以為了減少竊取任務執行緒和被竊取任務執行緒之間的競爭,通常會使用雙端佇列,被竊取任務執行緒永遠從雙端佇列的頭部拿任務執行,而竊取任務的執行緒永遠從雙端佇列的尾部拿任務執行。

工作竊取演算法的優點是充分利用執行緒進行平行計算,並減少了執行緒間的競爭,其缺點是在某些情況下還是存在競爭,比如雙端佇列裡只有一個任務時。並且消耗了更多的系統資源,比如建立多個執行緒和多個雙端佇列。

所以對於Fork/Join框架而言,當一個任務正在等待它使用join操作建立的子任務的結束時,執行這個任務的執行緒(工作執行緒)查詢其他未被執行的任務並開始它的執行。通過這種方式,執行緒充分利用它們的執行時間,從而提高了應用程式的效能。

為實現這個目標,Fork/Join框架執行的任務有以下侷限性:

  • 任務只能使用fork()和join()操作,作為同步機制。如果使用其他同步機制,工作執行緒不能執行其他任務,當它們在同步操作時。比如,在Fork/Join框架中,你使任務進入睡眠,那麼在這睡眠期間內,正在執行這個任務的工作執行緒將不會執行其他任務。
  • 任務不應該執行I/O操作,如讀或寫資料檔案。
  • 任務不能丟擲檢查異常,它必須包括必要的程式碼來處理它們。

Fork/Join框架的核心主要是以下兩個類:

  • ForkJoinPool:它實現ExecutorService介面和work-stealing演算法。它管理工作執行緒和提供關於任務的狀態和它們執行的資訊。
  • ForkJoinTask: 它是將在ForkJoinPool中執行的任務的基類。它提供在任務中執行fork()和join()操作的機制,並且這兩個方法控制任務的狀態。通常, 為了實現你的Fork/Join任務,你將實現兩個子類的子類的類:RecursiveAction對於沒有返回結果的任務和RecursiveTask 對於返回結果的任務。

Fork/Join使用示例,完成1+2+3+4…+n的計算,程式碼如下:

package org.zero.concurrency.demo.example.aqs;

import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

@Slf4j
public class ForkJoinTaskExample extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2;
    private int start;
    private int end;

    private ForkJoinTaskExample(int start, int end) {
        this.start = start;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        int sum = 0;

        //如果任務足夠小就直接計算任務
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute) {
            for (int i = start; i <= end; i++) {
                sum += i;
            }
        } else {
            // 如果任務大於閾值,就分裂成兩個子任務計算
            int middle = (start + end) / 2;
            ForkJoinTaskExample leftTask = new ForkJoinTaskExample(start, middle);
            ForkJoinTaskExample rightTask = new ForkJoinTaskExample(middle + 1, end);

            // 執行子任務
            leftTask.fork();
            rightTask.fork();

            // 等待任務執行結束合併其結果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            // 合併子任務
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkjoinPool = new ForkJoinPool();

        //生成一個計算任務,計算1+2+3+4...+100
        ForkJoinTaskExample task = new ForkJoinTaskExample(1, 100);

        //執行一個任務
        Future<Integer> result = forkjoinPool.submit(task);

        try {
            log.info("result:{}", result.get());
        } catch (Exception e) {
            log.error("exception", e);
        }
    }
}

J.U.C-BlockingQueue

在新增的Concurrent包中,BlockingQueue很好的解決了多執行緒中,如何高效安全“傳輸”資料的問題,從名字也可以知道它是執行緒安全的。通過這些高效並且執行緒安全的佇列類,為我們快速搭建高質量的多執行緒程式帶來極大的便利。

首先,最基本的來說, BlockingQueue 是一個先進先出的佇列(Queue),為什麼說是阻塞(Blocking)的呢?是因為 BlockingQueue 支援當獲取佇列元素但是佇列為空時,會阻塞等待佇列中有元素再返回;也支援新增元素時,如果佇列已滿,那麼等到佇列可以放入新元素時再放入。所以 BlockingQueue 主要應用於生產者消費者場景。
在這裡插入圖片描述
BlockingQueue 是一個介面,繼承自 Queue,所以其實現類也可以作為 Queue 的實現來使用,而 Queue 又繼承自 Collection 介面。

BlockingQueue 對插入操作、移除操作、獲取元素操作提供了四種不同的方法用於不同的場景中使用,總結如下表:

- Throws exception Special value Blocks Times out
Insert add(e) offer(e) put(e) offer(e, time, unit)
Insert remove() poll() take() poll(time, unit)
Examine element() peek() not applicable not applicable

說明:

  1. Throws Exceptions :如果不能立即執行就丟擲異常
  2. Special Value:如果不能立即執行就返回一個特殊的值(null 或 true/false,取決於具體的操作)
  3. Blocks:如果不能立即執行就阻塞等待此操作,直到這個操作成功
  4. Times Out:如果不能立即執行就阻塞一段時間,直到成功或者超時指定時間

BlockingQueue 的實現類:

ArrayBlockingQueue:它是一個有界的阻塞佇列,內部實現是陣列,需在初始化時指定容量大小,一旦指定大小就不能再變。採用FIFO方式儲存元素:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    /** The queued items */
    final Object[] items;
    ...
}    

DelayQueue:阻塞內部元素,DelayQueue內部元素必須實現Delayed介面,Delayed介面又繼承了Comparable介面,原因在於DelayQueue內部元素需要排序,一般情況下按元素過期時間優先順序排序:

public interface Delayed extends Comparable<Delayed> {
    long getDelay(TimeUnit unit);
}

DalayQueue內部採用PriorityQueue與ReentrantLock實現:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    ...
}

LinkedBlockingQueue:使用獨佔鎖實現的阻塞佇列,大小配置可選,如果初始化時指定了大小,那麼它就是有邊界的。不指定就無邊界(最大整型值)。內部實現是連結串列,採用FIFO形式儲存資料。

public LinkedBlockingQueue() {
    // 不指定大小,無邊界採用預設值,最大整型值
    this(Integer.MAX_VALUE);
}

PriorityBlockingQueue:帶優先順序的×××阻塞佇列,無邊界佇列,允許插入null。插入的物件必須實現Comparator介面,佇列優先順序的排序規則就是按照我們對Comparable介面的實現來指定的。我們可以從PriorityBlockingQueue中獲取一個迭代器,但這個迭代器並不保證能按照優先順序的順序進行迭代:

public class PriorityBlockingQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    ...

    public boolean add(E e) {
        return offer(e);
    }

    public boolean offer(E e) {
        if (e == null)
            throw new NullPointerException();
        final ReentrantLock lock = this.lock;
        lock.lock();
        int n, cap;
        Object[] es;
        while ((n = size) >= (cap = (es = queue).length))
            tryGrow(es, cap);
        try {
            //必須實現Comparator介面
            final Comparator<? super E> cmp;
            if ((cmp = comparator) == null)
                siftUpComparable(n, e, es);
            else
                siftUpUsingComparator(n, e, es, cmp);
            size = n + 1;
            notEmpty.signal();
        } finally {
            lock.unlock();
        }
        return true;
    }
    ...
}

SynchronousQueue:同步阻塞佇列,只能插入一個元素,×××非快取佇列,不儲存元素。其內部並沒有資料快取空間,你不能呼叫peek()方法來看佇列中是否有資料元素,當然遍歷這個佇列的操作也是不允許的:

public class SynchronousQueue<E> extends AbstractQueue<E>
    implements BlockingQueue<E>, java.io.Serializable {
    ...
}