1. 程式人生 > >Java多執行緒與高併發:java.util.concurrent包

Java多執行緒與高併發:java.util.concurrent包

面試官:你用過JUC的哪些工具類?
前面從基礎開始,到執行緒安全的實現、物件的釋出與共享,涉及到很多執行緒安全的類與工具,JDK1.5開始,提供了更加方便強大的執行緒同步管理工具包JUC讓我們使用,這個也是面試與實踐中的重點,本文結合原始碼作一些比較落地的講解。

 

image

 

報告面試官,JUC中有非常多的類,將部分類按功能進行分類,分別是:

  1. 之前提到過的原子atomic
  2. 比synchronized功能更強大的lock
  3. 執行緒排程管理工具
  4. 執行緒安全與併發工具集合
  5. 執行緒池

AQS

AbstractQueuedSynchronizer,即佇列同步器。它是構建鎖或者其他同步元件的基礎框架,它是JUC併發包中的核心基礎元件。

JUC大大提高了Java的併發能力,AQS是JUC的核心。

原理

 

image.png


同步佇列:AQS通過內建的FIFO同步佇列來完成資源獲取執行緒的排隊工作,如果當前執行緒獲取同步狀態失敗(鎖)時,AQS則會將當前執行緒以及等待狀態等資訊構造成一個節點(Node)並將其加入同步佇列,同時會阻塞當前執行緒,當同步狀態釋放時,則會把節點中的執行緒喚醒,使其再次嘗試獲取同步狀態。
繼承實現:AQS的主要使用方式是繼承,子類通過繼承同步器並實現它的抽象方法acquire/release來管理同步狀態。
同步狀態維護:AQS使用一個int型別的成員變數state來表示同步狀態,當state > 0時表示已經獲取了鎖,當state = 0時表示釋放了鎖。它提供了三個方法(getState()、setState(int newState)、compareAndSetState(int expect,int update))來對同步狀態state進行操作,當然AQS可以確保對state的操作是安全的。

 

CountDownLatch

計數器閉鎖是一個能阻塞主執行緒,讓其他執行緒滿足特定條件下主執行緒再繼續執行的執行緒同步工具。

使用

public class CountDownLatchTest {

    private static final int COUNT = 1000;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        CountDownLatch countDownLatch = new CountDownLatch(COUNT);
        for (int i = 0; i < COUNT; i++) { //countDown方法的執行次數一定要與countDownLatch的計數器數量一致,否則無法將計數器清空導致主執行緒無法繼續執行
            int finalI = i;
            executorService.execute(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println(finalI);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    countDownLatch.countDown();
                }
            });
        }
        countDownLatch.await(1, TimeUnit.SECONDS); //主執行緒只等1秒,超過之後繼續執行主執行緒
        executorService.shutdown(); //當正在執行的執行緒執行完成之後再關閉而不是立即停止執行緒
        System.out.println("done!");
    }
}

這段程式先設定CountDownLatch為100,然後在其他執行緒中呼叫100次countDown方法,隨後主程式在等待100次被執行完成之後,繼續執行主執行緒程式碼

原理

image.png

 

圖中,A為主執行緒,A首先設定計數器的數到AQS的state中,當呼叫await方法之後,A執行緒阻塞,隨後每次其他執行緒呼叫countDown的時候,將state減1,直到計數器為0的時候,A執行緒繼續執行。

使用場景

1.平行計算:把任務分配給不同執行緒之後需要等待所有執行緒計算完成之後主執行緒才能彙總得到最終結果
2.模擬併發:可以作為併發次數的統計變數,當任意多個執行緒執行完成併發任務之後統計一次即可

Semaphore

訊號量是一個能阻塞執行緒且能控制統一時間請求的併發量的工具。比如能保證同時執行的執行緒最多200個,模擬出穩定的併發量。

使用

public class CountDownLatchTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(3); //配置只能釋出3個執行許可證
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            executorService.execute(() -> {
                try {
                    semaphore.acquire(3); //獲取3個執行許可,如果獲取不到會一直等待,使用tryAcquire則不會等待
                    Thread.sleep(1000);
                    System.out.println(finalI);
                    semaphore.release(3);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
        executorService.shutdown();
    }
}

由於同時獲取3個許可,所以有即使開啟了100個執行緒,但是每秒只能執行一個任務

原理

new Semaphore(3)傳入的3就是AQS中state的值,也是許可數的總數,在呼叫acquire時,檢測此時許可數如果小於0,就將被阻塞,然後將執行緒構建Node進入AQS佇列

//AQS的骨架,其中tryAcquireShared將呼叫到Semaphore中的nonfairTryAcquireShared
//一般常用非公平的訊號量,非公平訊號量是指在獲取許可時直接迴圈獲取,如果獲取失敗,才會入列
//公平的訊號量在獲取許可時首先要檢視等待佇列中是否已有執行緒,如果有則將執行緒入列等待
private void doAcquireSharedInterruptibly(int arg)
    throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

// 如果remaining小於0,許可獲取失敗,執行shouldParkAfterFailedAcquire方法入列然後等待
// 如果remaining大於0,許可獲取成功,且更新state成功,那麼則setHeadAndPropagate並且立即返回
final int nonfairTryAcquireShared(int acquires) {
    for (;;) {
        int available = getState();
        int remaining = available - acquires;
        if (remaining < 0 ||
            compareAndSetState(available, remaining))
            return remaining;
    }
}

使用場景

資料庫連線併發數,如果超過併發數,等待(acqiure)或者丟擲異常(tryAcquire)

CyclicBarrier

可以讓一組執行緒相互等待,當每個執行緒都準備好之後,所有執行緒才繼續執行的工具類

使用

public class CyclicBarrierTest {
    private static CyclicBarrier cyclicBarrier = new CyclicBarrier(5, () -> {
        System.out.println("ready done callback!");
    });

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            int finalI = i;
            Thread.sleep(1000);
            executorService.execute(() -> {
                try {
                    System.out.println(finalI + "ready!");
                    cyclicBarrier.await();
//                    cyclicBarrier.await(2000, TimeUnit.MILLISECONDS); // 如果某個執行緒等待超過2秒就報錯
                    System.out.println(finalI + "go!");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });

        }
    }
}

原理

image.png

 

與CountDownLatch類似,都是通過計數器實現的,當某個執行緒呼叫await之後,計數器減1,當計數器大於0時將等待的執行緒包裝成AQS的Node放入等待佇列中,當計數器為0時將等待佇列中的Node拿出來執行。
與CountDownLatch的區別:
1.CDL是一個執行緒等其他執行緒,CB是多個執行緒相互等待
2.CB的計數器能重複使用,呼叫多次

使用場景

1.CyclicBarrier可以用於多執行緒計算資料,最後合併計算結果的應用場景。比如我們用一個Excel儲存了使用者所有銀行流水,每個Sheet儲存一個帳戶近一年的每筆銀行流水,現在需要統計使用者的日均銀行流水,先用多執行緒處理每個sheet裡的銀行流水,都執行完之後,得到每個sheet的日均銀行流水,最後,再用barrierAction用這些執行緒的計算結果,計算出整個Excel的日均銀行流水。
2.有四個遊戲玩家玩遊戲,遊戲有三個關卡,每個關卡必須要所有玩家都到達後才能允許通過。其實這個場景裡的玩家中如果有玩家A先到了關卡1,他必須等到其他所有玩家都到達關卡1時才能通過,也就是說執行緒之間需要相互等待。

ReentrantLock

名為可重入鎖,其實synchronized也可重入,是JDK層級上的一個併發控制工具

使用

public class ConcurrencyTest {
    private static final int THREAD_COUNT = 5000;
    private static final int CONCURRENT_COUNT = 200;
    private static int count = 0;
    private static Lock lock = new ReentrantLock();

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(CONCURRENT_COUNT);
        CountDownLatch countDownLatch = new CountDownLatch(THREAD_COUNT);
        for (int i = 0; i < THREAD_COUNT; i++) {
            executorService.execute(() -> {
                try {
                    semaphore.acquire();
                    add();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                countDownLatch.countDown();
            });
        }
        countDownLatch.await();
        executorService.shutdown();
        System.out.println(count);
    }


    private static void add() {
        lock.lock();
        try {
            count++;
        } finally {
            lock.unlock();
        }
    }
}

原理

參考:https://www.jianshu.com/p/fe027772e156

// 以公平鎖為例,從lock.lock()開始研究
final void lock() { acquire(1);}

public final void acquire(int arg) {
    if (!tryAcquire(arg) && // 首先通過公平或者非公平方式嘗試獲取鎖
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 然後構建一個Node放入佇列中並等待執行的時機
        selfInterrupt();
}

// 公平鎖設定鎖執行狀態的邏輯
protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { //如果state是0,就是當前的鎖沒有人佔有
        if (!hasQueuedPredecessors() && // 公平鎖的核心邏輯,判斷佇列是否有排在前面的執行緒在等待鎖,非公平鎖就沒這個條件判斷
            compareAndSetState(0, acquires)) { // 如果佇列沒有前面的執行緒,使用CAS的方式修改state
            setExclusiveOwnerThread(current); // 將執行緒記錄為獨佔鎖的執行緒
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) { // 因為ReentrantLock是可重入的,執行緒可以不停地lock來增加state的值,對應地需要unlock來解鎖,直到state為零
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

// 接下來要執行的acquireQueued如下
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) { // 再次使用公平鎖邏輯判斷是否將Node作為頭結點立即執行
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

與synchronized的區別

1.用法:synchronized既可以很方便的加在方法上,也可以載入特定程式碼塊上,而lock需要顯示地指定起始位置和終止位置。
2.實現:synchronized是依賴於JVM實現的,而ReentrantLock是JDK實現的
3.效能:synchronized和lock其實已經相差無幾,其底層實現已經差不多了。但是如果你是Android開發者,使用synchronized還是需要考慮其效能差距的。
4.功能:ReentrantLock功能更強大。
4.1 ReentrantLock可以指定是公平鎖還是非公平鎖,而synchronized只能是非公平鎖,所謂的公平鎖就是先等待的執行緒先獲得鎖
4.2 ReentrantLock提供了一個Condition(條件)類,用來實現分組喚醒需要喚醒的執行緒們,而不是像synchronized要麼隨機喚醒一個執行緒要麼喚醒全部執行緒
4.3 ReentrantLock提供了一種能夠中斷等待鎖的執行緒的機制,通過lock.lockInterruptibly()來實現這個機制
我們控制執行緒同步的時候,優先考慮synchronized,如果有特殊需要,再進一步優化。ReentrantLock如果用的不好,不僅不能提高效能,還可能帶來災難。

Condition

條件物件的意義在於對於一個已經獲取鎖的執行緒,如果還需要等待其他條件才能繼續執行的情況下,才會使用Condition條件物件。
與ReentrantLock結合使用,類似wait與notify。

使用

public class ConditionTest {

    public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        Thread thread1 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " wait for condition");
                try {
                    condition.await(); // 1.將執行緒1放入到Condition佇列中等待被喚醒,且立即釋放鎖
                    System.out.println(Thread.currentThread().getName() + " continue"); // 3.執行緒2執行完畢釋放鎖,此時執行緒1已經在AQS等待佇列中,則立即執行
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
            } finally {
                lock.unlock();
            }
        });
        Thread thread2 = new Thread(() -> {
            lock.lock();
            try {
                System.out.println(Thread.currentThread().getName() + " run");
                System.out.println(Thread.currentThread().getName() + " sleep 1 secs");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    System.err.println(Thread.currentThread().getName() + " interrupted");
                    Thread.currentThread().interrupt();
                }
                condition.signalAll(); // 2.執行緒2獲得鎖,signalAll將Condition中的等待佇列全部取出並加入到AQS中
            } finally {
                lock.unlock();
            }
        });
        thread1.start();
        thread2.start();
    }

}

輸出結果為

Thread-0 run
Thread-0 wait for condition
Thread-1 run
Thread-1 sleep 1 secs
Thread-0 continue

使用場景

可參看第一篇中PDF資料中《執行緒間通訊》一節

Future、FutureTask

不是AQS的子類,但是能拿到執行緒執行的結果非常有用。

Callable與Runnable

java.lang.Runnable

public interface Runnable {
    public abstract void run();
}

由於run()方法返回值為void型別,所以在執行完任務之後無法返回任何結果
要使用的話直接實現就可以了

java.util.concurrent.Callable

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

泛型介面,call()函式返回的型別就是傳遞進來的V型別,同時能結合lambda使用
要使用的話要結合ExecutorService的如下方法使用

<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);

Future介面

FutureTask<V> implements RunnableFuture<V>
RunnableFuture<V> extends Runnable, Future<V>

Future是Java 5新增的類,用來描述一個非同步計算的結果。你可以使用isDone方法檢查計算是否完成,或者使用get阻塞住呼叫執行緒,直到計算完成返回結果,你也可以使用cancel方法停止任務的執行。

public class FutureTest {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        Future<String> future = executorService.submit(() -> {
            try {
                System.out.println("doing");
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "done";
        });
        System.out.println(future.get());
    }
}

介面畢竟是介面,只能被賦值,不能直接new出來,所以可以new FutureTask直接來建立Future任務

FutureTask類

public class FutureTaskTest {


    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ExecutorService executorService = Executors.newCachedThreadPool();
        FutureTask<String> futureTask = new FutureTask<>(() -> {
            System.out.println("doing");
            Thread.sleep(1000);
            return "down";
        });
        executorService.submit(futureTask);

//        new Thread(futureTask).start();
        System.out.println(futureTask.get());
        executorService.shutdown();
    }
}

CompletableFuture類

但其實在專案中使用到最多的Future類是1.8提供的這個類,因為雖然Future以及相關使用方法提供了非同步執行任務的能力,但是對於結果的獲取卻是很不方便,只能通過阻塞方式得到任務的結果,阻塞的方式顯然和我們的非同步程式設計的初衷相違背。Java CompletableFuture 詳解
其實簡單來說,原理就是通過自己維護一套執行緒同步與等待的機制與執行緒池去實現這樣的非同步任務處理機制,下面的例子是開發中最經常用到的,等待所有任務完成,繼續處理資料的例子。還有非同步任務依賴的例子請參看上文連線。

public class CompletableFutureTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<String> string1Future = CompletableFuture.supplyAsync(() -> {
            System.out.println("doing string1");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done string1");
            return "string1";
        });
        CompletableFuture<String> string2Future = CompletableFuture.supplyAsync(() -> {
            System.out.println("doing string2");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("done string2");
            return "string2";
        });

        CompletableFuture.allOf(string1Future, string2Future).join();
        System.out.println(string1Future.get() + "and" + string2Future.get());
    }
}

BlockingQueue

假設我們有若干生產者執行緒,另外又有若干個消費者執行緒。如果生產者執行緒需要把準備好的資料共享給消費者執行緒,利用佇列的方式來傳遞資料,就可以很方便地解決他們之間的資料共享問題。
但如果生產者和消費者在某個時間段內,萬一發生資料處理速度不匹配的情況呢?理想情況下,如果生產者產出資料的速度大於消費者消費的速度,並且當生產出來的資料累積到一定程度的時候,那麼生產者暫停等待一下(阻塞生產者執行緒)或者繼續將產品放入佇列中。
然而,在concurrent包釋出以前,在多執行緒環境下,我們每個程式設計師都必須去自己控制這些細節,尤其還要兼顧效率和執行緒安全,而這會給我們的程式帶來不小的複雜度

在後文的執行緒池相關內容中會提到,執行緒池也使用到了這個工具完成不同需求。

使用方式、子類的詳細介紹參看這裡

Fork Join框架

fork join框架是JDK7中出現的一款高效的工具,Java開發人員可以通過它充分利用現代伺服器上的多處理器。它是專門為了那些可以遞迴劃分成許多子模組設計的,目的是將所有可用的處理能力用來提升程式的效能。fork join框架一個巨大的優勢是它使用了工作竊取演算法,可以完成更多工的工作執行緒可以從其它執行緒中竊取任務來執行
但這樣會要額外地對任務分派執行緒進行管理,無形地會增加管理的難度和複雜度,還可能碰到資源競爭導致的同步操作與效能損耗

參考

http://coding.imooc.com/class/195.html
以及其他超連線引用

號外號外

最近在總結一些針對Java面試相關的知識點,感興趣的朋友可以一起維護~
地址:https://github.com/xbox1994/2018-Java-Interview

轉載自:Java多執行緒與高併發(四):java.util.concurrent包



作者:Gallrax
連結:https://www.jianshu.com/p/46728d6bc6b2
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授