1. 程式人生 > >Java併發工具類詳解

Java併發工具類詳解

在JDK的併發包裡提供了幾個非常有用的併發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種併發流程控制的手段,Exchanger工具類則提供了線上程間交換資料的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。

等待多執行緒完成的CountDownLatch

CountDownLatch允許一個或多個執行緒等待其他執行緒完成操作。
假如有這樣一個需求:我們需要解析一個Excel裡多個sheet的資料,此時可以考慮使用多執行緒,每個執行緒解析一個sheet裡的資料,等到所有的sheet都解析完之後,程式需要提示解析完成(或者彙總結果)。在這個需求中,要實現主執行緒等待所有執行緒完成sheet的解析操作,最簡單的做法是使用join()方法,如程式碼清單8-1所示。

 

 
  1. import java.util.Random;

  2. import java.util.concurrent.atomic.AtomicInteger;

  3.  
  4. public class JoinCountDownLatchTest {

  5. private static Random sr=new Random(47);

  6. private static AtomicInteger result=new AtomicInteger(0);

  7. private static int threadCount=10;

  8. private static class Parser implements Runnable{

  9. String name;

  10. public Parser(String name){

  11. this.name=name;

  12. }

  13. @Override

  14. public void run() {

  15. int sum=0;

  16. int seed=Math.abs(sr.nextInt()) ;

  17. Random r=new Random(47);

  18. for(int i=0;i<100;i++){

  19. sum+=r.nextInt(seed);

  20. }

  21. result.addAndGet(sum);

  22. System.out.println(name+"執行緒的解析結果:"+sum);

  23. }

  24. }

  25. public static void main(String[] args) throws InterruptedException {

  26. Thread[] threads=new Thread[threadCount];

  27. for(int i=0;i<threadCount;i++){

  28. threads[i]=new Thread(new Parser("Parser-"+i));

  29. }

  30. for(int i=0;i<threadCount;i++){

  31. threads[i].start();

  32. }

  33. for(int i=0;i<threadCount;i++){

  34. threads[i].join();

  35. }

  36. System.out.println("所有執行緒解析結束!");

  37. System.out.println("所有執行緒的解析結果:"+result);

  38. }

  39. }

輸出:

 

Parser-1執行緒的解析結果:-2013585201
Parser-0執行緒的解析結果:1336321192
Parser-2執行緒的解析結果:908136818
Parser-5執行緒的解析結果:-1675827227
Parser-3執行緒的解析結果:1638121055
Parser-4執行緒的解析結果:1513365118
Parser-6執行緒的解析結果:489607354
Parser-8執行緒的解析結果:1513365118
Parser-7執行緒的解析結果:-1191966831
Parser-9執行緒的解析結果:-912399159
所有執行緒解析結束!
所有執行緒的解析結果:1605138237
join用於讓當前執行執行緒等待join執行緒執行結束。其實現原理是不停檢查join執行緒是否存活,如果join執行緒存活則讓當前執行緒永遠等待。其中,wait(0)表示永遠等待下去,程式碼片段如下。

 

 
  1. public class Thread implements Runnable {

  2. ......

  3. public final void join() throws InterruptedException {

  4. join(0);

  5. }

  6. public final synchronized void join(long millis)

  7. throws InterruptedException {

  8. long base = System.currentTimeMillis();

  9. long now = 0;

  10.  
  11. if (millis < 0) {

  12. throw new IllegalArgumentException("timeout value is negative");

  13. }

  14.  
  15. if (millis == 0) {//執行到這裡

  16. while (isAlive()) {

  17. wait(0);//main執行緒永遠等待join執行緒

  18. }

  19. } else {

  20. while (isAlive()) {

  21. long delay = millis - now;

  22. if (delay <= 0) {

  23. break;

  24. }

  25. wait(delay);

  26. now = System.currentTimeMillis() - base;

  27. }

  28. }

  29. }

  30. ......

  31. }

 

直到join執行緒中止後,執行緒的this.notifyAll()方法會被呼叫,呼叫notifyAll()方法是在JVM裡實現的,所以在JDK裡看不到,大家可以檢視JVM原始碼。

在JDK 1.5之後的併發包中提供的CountDownLatch也可以實現join的功能,並且比join的功能更多,如程式碼清單8-2所示。

 

 
  1. import java.util.Random;

  2. import java.util.concurrent.CountDownLatch;

  3. import java.util.concurrent.atomic.AtomicInteger;

  4.  
  5. public class CountDownLatchTest {

  6. private static Random sr=new Random(47);

  7. private static AtomicInteger result=new AtomicInteger(0);

  8. private static int threadCount=10;//執行緒數量

  9. private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch

  10. private static class Parser implements Runnable{

  11. String name;

  12. public Parser(String name){

  13. this.name=name;

  14. }

  15. @Override

  16. public void run() {

  17. int sum=0;

  18. int seed=Math.abs(sr.nextInt()) ;

  19. Random r=new Random(47);

  20. for(int i=0;i<100;i++){

  21. sum+=r.nextInt(seed);

  22. }

  23. result.addAndGet(sum);

  24. System.out.println(name+"執行緒的解析結果:"+sum);

  25. countDown.countDown();//注意這裡

  26. }

  27. }

  28. public static void main(String[] args) throws InterruptedException {

  29. Thread[] threads=new Thread[threadCount];

  30. for(int i=0;i<threadCount;i++){

  31. threads[i]=new Thread(new Parser("Parser-"+i));

  32. }

  33. for(int i=0;i<threadCount;i++){

  34. threads[i].start();

  35. }

  36. /*

  37. for(int i=0;i<threadCount;i++){

  38. threads[i].join();

  39. }*/

  40. countDown.await();//將join改為使用CountDownLatch

  41. System.out.println("所有執行緒解析結束!");

  42. System.out.println("所有執行緒的解析結果:"+result);

  43. }

  44. }

輸出:

 

Parser-0執行緒的解析結果:1336321192
Parser-1執行緒的解析結果:-2013585201
Parser-2執行緒的解析結果:-1675827227
Parser-4執行緒的解析結果:1638121055
Parser-3執行緒的解析結果:908136818
Parser-5執行緒的解析結果:1513365118
Parser-7執行緒的解析結果:489607354
Parser-6執行緒的解析結果:1513365118
Parser-8執行緒的解析結果:-1191966831
Parser-9執行緒的解析結果:-912399159
所有執行緒解析結束!
所有執行緒的解析結果:1605138237

CountDownLatch的建構函式接收一個int型別的引數作為計數器,如果你想等待N個點完成,這裡就傳入N。
當我們呼叫CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前執行緒,直到N變成零。由於countDown方法可以用在任何地方,所以這裡說的N個點,可以是N個執行緒,也可以是1個執行緒裡的N個執行步驟。用在多個執行緒時,只需要把這個CountDownLatch的引用傳遞到執行緒裡即可。
如果有某個解析sheet的執行緒處理得比較慢,我們不可能讓主執行緒一直等待,所以可以使用另外一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間後,就會不再阻塞當前執行緒。join也有類似的方法。
注意:計數器必須大於等於0,只是等於0時候,計數器就是零,呼叫await方法時不會阻塞當前執行緒。CountDownLatch不可能重新初始化或者修改CountDownLatch物件的內部計數器的值。一個執行緒呼叫countDown方法happen-before,另外一個執行緒呼叫await方法。

 

 
  1. public class CountDownLatch {

  2. /**Synchronization control For CountDownLatch. Uses AQS state to represent count.*/

  3. private static final class Sync extends AbstractQueuedSynchronizer {

  4. private static final long serialVersionUID = 4982264981922014374L;

  5.  
  6. Sync(int count) {

  7. setState(count);//初始化同步狀態

  8. }

  9.  
  10. int getCount() {

  11. return getState();

  12. }

  13.  
  14. protected int tryAcquireShared(int acquires) {

  15. return (getState() == 0) ? 1 : -1;

  16. }

  17.  
  18. protected boolean tryReleaseShared(int releases) {

  19. // Decrement count; signal when transition to zero

  20. for (;;) {

  21. int c = getState();

  22. if (c == 0)

  23. return false;

  24. int nextc = c-1;

  25. if (compareAndSetState(c, nextc))

  26. return nextc == 0;

  27. }

  28. }

  29. }

  30.  
  31. private final Sync sync;//組合一個同步器(AQS)

  32.  
  33. public CountDownLatch(int count) {

  34. if (count < 0) throw new IllegalArgumentException("count < 0");

  35. this.sync = new Sync(count);//初始化同步狀態

  36. }

  37. /*Causes the current thread to wait until the latch has counted down to

  38.      * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.*/

  39. public void await() throws InterruptedException {

  40. sync.acquireSharedInterruptibly(1);//

  41. }

  42.  
  43. public boolean await(long timeout, TimeUnit unit)

  44. throws InterruptedException {

  45. return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));

  46. }

  47. public void countDown() {

  48. sync.releaseShared(1);//釋放同步狀態

  49. }

  50.  
  51. public long getCount() {

  52. return sync.getCount();

  53. }

  54.  
  55. public String toString() {

  56. return super.toString() + "[Count = " + sync.getCount() + "]";

  57. }

  58. }

 

同步屏障CyclicBarrier

CyclicBarrier的字面意思是可迴圈使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組執行緒到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個執行緒到達屏障時,屏障才會開門,所有被屏障攔截的執行緒才會繼續執行。

CyclicBarrier預設的構造方法是CyclicBarrier(int parties),其引數表示屏障攔截的執行緒數量,每個執行緒呼叫await方法告訴CyclicBarrier我已經到達了屏障,然後當前執行緒被阻塞。

 

 
  1. import java.util.Random;

  2. import java.util.concurrent.CyclicBarrier;

  3. import java.util.concurrent.atomic.AtomicInteger;

  4.  
  5.  
  6. public class CyclicBarrierTest {

  7.  
  8. private static Random sr=new Random(47);

  9. private static AtomicInteger result=new AtomicInteger(0);

  10. private static int threadCount=10;

  11. //屏障後面執行彙總

  12. private static CyclicBarrier barrier=new CyclicBarrier(threadCount,new Accumulate());

  13. private static class Parser implements Runnable{

  14. String name;

  15. public Parser(String name){

  16. this.name=name;

  17. }

  18. @Override

  19. public void run() {

  20. int sum=0;

  21. int seed=Math.abs(sr.nextInt()) ;

  22. Random r=new Random(47);

  23. for(int i=0;i<(seed%100*100000);i++){

  24. sum+=r.nextInt(seed);

  25. }

  26. result.addAndGet(sum);

  27. System.out.println(System.currentTimeMillis()+"-"+name+"執行緒的解析結果:"+sum);

  28. try {

  29. barrier.await();

  30. System.out.println(System.currentTimeMillis()+"-"+name+"執行緒越過屏障!");

  31. } catch (Exception e) {

  32. e.printStackTrace();

  33. }

  34. }

  35. }

  36. static class Accumulate implements Runnable{

  37. @Override

  38. public void run() {

  39. System.out.println("所有執行緒解析結束!");

  40. System.out.println("所有執行緒的解析結果:"+result);

  41. }

  42. }

  43. public static void main(String[] args) throws InterruptedException {

  44. Thread[] threads=new Thread[threadCount];

  45. for(int i=0;i<threadCount;i++){

  46. threads[i]=new Thread(new Parser("Parser-"+i));

  47. }

  48. for(int i=0;i<threadCount;i++){

  49. threads[i].start();

  50. }

  51. }

  52. }

輸出:

 

1471866228774-Parser-4執行緒的解析結果:631026992
1471866228930-Parser-3執行緒的解析結果:-372785277
1471866228961-Parser-1執行緒的解析結果:-938473891
1471866229008-Parser-7執行緒的解析結果:-396620018
1471866229008-Parser-2執行緒的解析結果:-1159985406
1471866229024-Parser-5執行緒的解析結果:-664234808
1471866229070-Parser-6執行緒的解析結果:556534377
1471866229117-Parser-9執行緒的解析結果:-844558478
1471866229383-Parser-0執行緒的解析結果:919864023
1471866229430-Parser-8執行緒的解析結果:-2104111089
所有執行緒解析結束!
所有執行緒的解析結果:-78376279
1471866229430-Parser-8執行緒越過屏障!
1471866229430-Parser-2執行緒越過屏障!
1471866229430-Parser-9執行緒越過屏障!
1471866229430-Parser-7執行緒越過屏障!
1471866229430-Parser-1執行緒越過屏障!
1471866229430-Parser-3執行緒越過屏障!
1471866229430-Parser-0執行緒越過屏障!
1471866229430-Parser-6執行緒越過屏障!
1471866229430-Parser-4執行緒越過屏障!
1471866229430-Parser-5執行緒越過屏障!
我們發現,各個執行緒解析完成的時間不一致,但是越過屏障的時間卻是一致的。

CyclicBarrier和CountDownLatch的區別

CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為複雜的業務場景。例如,如果計算髮生錯誤,可以重置計數器,並讓執行緒重新執行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的執行緒數量。isBroken()方法用來了解阻塞的執行緒是否被中斷。

控制併發執行緒數的Semaphore

Semaphore(訊號量)是用來控制同時訪問特定資源的執行緒數量,它通過協調各個執行緒,以保證合理的使用公共資源。
多年以來,我都覺得從字面上很難理解Semaphore所表達的含義,只能把它比作是控制流量的紅綠燈。比如××馬路要限制流量,只允許同時有一百輛車在這條路上行使,其他的都必須在路口等待,所以前一百輛車會看到綠燈,可以開進這條馬路,後面的車會看到紅燈,不能駛入××馬路,但是如果前一百輛中有5輛車已經離開了××馬路,那麼後面就允許有5輛車駛入馬路,這個例子裡說的車就是執行緒,駛入馬路就表示執行緒在執行,離開馬路就表示執行緒執行完成,看見紅燈就表示執行緒被阻塞,不能執行。

應用場景

Semaphore可以用於做流量控制,特別是公用資源有限的應用場景,比如資料庫連線。假如有一個需求,要讀取幾萬個檔案的資料,因為都是IO密集型任務,我們可以啟動幾十個執行緒併發地讀取,但是如果讀到記憶體後,還需要儲存到資料庫中,而資料庫的連線數只有10個,這時我們必須控制只有10個執行緒同時獲取資料庫連線儲存資料,否則會報錯無法獲取資料庫連線。這個時候,就可以使用Semaphore來做流量控制,如程式碼清單8-7所示。

 

 
  1. import java.util.concurrent.ExecutorService;

  2. import java.util.concurrent.Executors;

  3. import java.util.concurrent.Semaphore;

  4.  
  5. public class SemaphoreTest {

  6. private static final int THREAD_COUNT = 30;

  7. private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);

  8. private static Semaphore s = new Semaphore(10);

  9.  
  10. public static void main(String[] args) {

  11. for (int i = 0; i < THREAD_COUNT; i++) {

  12. threadPool.execute(new Runnable() {

  13. @Override

  14. public void run() {

  15. try {

  16. s.acquire();

  17. System.out.println("save data");

  18. s.release();

  19. } catch (InterruptedException e) {

  20. }

  21. }

  22. });

  23. }

  24. threadPool.shutdown();

  25. }

  26. }

在程式碼中,雖然有30個執行緒在執行,但是隻允許10個併發執行。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore(10)表示允許10個執行緒獲取許可證,也就是最大併發數是10。Semaphore的用法也很簡單,首先執行緒使用Semaphore的acquire()方法獲取一個許可證,使用完之後呼叫release()方法歸還許可證。還可以
用tryAcquire()方法嘗試獲取許可證。

 

其他方法

Semaphore還提供一些其他方法,具體如下。
int availablePermits():返回此訊號量中當前可用的許可證數。
int getQueueLength():返回正在等待獲取許可證的執行緒數。
boolean hasQueuedThreads():是否有執行緒正在等待獲取許可證。
void reducePermits(int reduction):減少reduction個許可證,是個protected方法。
Collection getQueuedThreads():返回所有等待獲取許可證的執行緒集合,是個protected方法。

執行緒間交換資料的Exchanger

Exchanger(交換者)是一個用於執行緒間協作的工具類。Exchanger用於進行執行緒間的資料交換。它提供一個同步點,在這個同步點,兩個執行緒可以交換彼此的資料。這兩個執行緒通過exchange方法交換資料,如果第一個執行緒先執行exchange()方法,它會一直等待第二個執行緒也執行exchange方法,當兩個執行緒都到達同步點時,這兩個執行緒就可以交換資料,將本執行緒生產出來的資料傳遞給對方。
下面來看一下Exchanger的應用場景。
1、Exchanger可以用於遺傳演算法,遺傳演算法裡需要選出兩個人作為交配物件,這時候會交換兩人的資料,並使用交叉規則得出2個交配結果。

2、Exchanger也可以用於校對工作,比如我們需要將紙製銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,採用AB崗兩人進行錄入,錄入到Excel之後,系統需要載入這兩個Excel,並對兩個Excel資料進行校對,看看是否錄入一致.

 

 
  1. import java.util.concurrent.Exchanger;

  2. import java.util.concurrent.ExecutorService;

  3. import java.util.concurrent.Executors;

  4.  
  5. public class ExchangerTest {

  6.  
  7. private static final Exchanger<String> exgr = new Exchanger<String>();

  8. private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

  9.  
  10. public static void main(String[] args) {

  11. threadPool.execute(new Runnable() {

  12. @Override

  13. public void run() {

  14. try {

  15. String A = "銀行流水100";// A錄入銀行流水資料

  16. String B=exgr.exchange(A);

  17. System.out.println("A的視角:A和B資料是否一致:" + A.equals(B) +

  18. ",A錄入的是:" + A + ",B錄入是:" + B);

  19. } catch (InterruptedException e) {

  20. }

  21. }

  22. });

  23. threadPool.execute(new Runnable() {

  24. @Override

  25. public void run() {

  26. try {

  27. String B = "銀行流水200";// B錄入銀行流水資料

  28. String A = exgr.exchange(B);

  29. System.out.println("B的視角:A和B資料是否一致:" + A.equals(B) +

  30. ",A錄入的是:" + A + ",B錄入是:" + B);

  31. } catch (InterruptedException e) {

  32. }

  33. }

  34. });

  35. threadPool.shutdown();

  36. }

  37. }

 

輸出:

B的視角:A和B資料是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
A的視角:A和B資料是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200

如果兩個執行緒有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設定最大等待時長。