並發編程常用工具類之countDownLatch和cyclicBarrier的使用對比
1.CountDownLatch
countDownLatch的作用是讓一組線程等待其他線程完成工作以後在執行,相當於加強版的join(不懂可以百度一下join的用法),一般在初始化的時候會在構造方法傳入計數器,
後續,在其他線程中每次調用countDown方法計數器減一,一般在需要等待的線程中調用countDownLatch的await方法阻塞線程,在當計數器為0時,等待線程繼續運行。
光看上面的定義描述不是很直觀,我們再來結合代碼看一下實際運用:
1 public class UseCountDownLatch { 2 3 staticCountDownLatch latch = new CountDownLatch(6); 4 //初始化線程(只有一步,有4個) 5 private static class InitThread implements Runnable{ 6 7 @Override 8 public void run() { 9 System.out.println("Thread_"+Thread.currentThread().getId() 10 +" ready init work......");11 latch.countDown();//初始化線程完成工作了,countDown方法只扣減一次; 12 for(int i =0;i<2;i++) { 13 System.out.println("Thread_"+Thread.currentThread().getId() 14 +" ........continue do its work"); 15 } 16 } 17 } 18 //業務線程 19 privatestatic class BusiThread implements Runnable{ 20 21 @Override 22 public void run() { 23 try { 24 latch.await(); 25 } catch (InterruptedException e) { 26 e.printStackTrace(); 27 } 28 for(int i =0;i<3;i++) { 29 System.out.println("BusiThread_"+Thread.currentThread().getId() 30 +" do business-----"); 31 } 32 } 33 } 34 35 public static void main(String[] args) throws InterruptedException { 36 //單獨的初始化線程,初始化分為2步,需要扣減兩次 37 new Thread(new Runnable() { 38 @Override 39 public void run() { 40 SleepTools.ms(1); 41 System.out.println("Thread_"+Thread.currentThread().getId() 42 +" ready init work step 1st......"); 43 latch.countDown();//每完成一步初始化工作,扣減一次 44 System.out.println("begin step 2nd......."); 45 SleepTools.ms(1); 46 System.out.println("Thread_"+Thread.currentThread().getId() 47 +" ready init work step 2nd......"); 48 latch.countDown();//每完成一步初始化工作,扣減一次 49 } 50 }).start(); 51 new Thread(new BusiThread()).start(); 52 for(int i=0;i<=3;i++){ 53 Thread thread = new Thread(new InitThread()); 54 thread.start(); 55 } 56 57 latch.await(); 58 System.out.println("Main do ites work........"); 59 } 60 }
運行結果可以看到,兩個初始化線程先跑,當兩個初始化線程跑完了,latch的計數器減為0,阻塞放開,主線程和業務線程繼續往下運行。但是,在設計這部分算法的時候需要註意,有可能會出現計數器沒有減為0,則線程一直阻塞,導致程序卡死。
countDownLatch一般使用在多線程並發之後需要對結果進行處理,而我們無法控制所有線程的執行時間,所以在這裏加上阻塞,等到只有線程全部執行完。
2.CyclicBarrier
cyclicBarrier從業務角度來說,和countDownLatch比較類似(具體對比後面會專門介紹),其作用類似一個屏障(英文中也是屏障的意思),阻隔線程直到全部到達屏障點,放開屏障。一般可以用在多線程統計的時候,
從代碼角度來看,cyclicBarrier有兩個構造方法,CyclicBarrier(int parties)和CyclicBarrier(int parties, Runnable barrierAction);
兩個方法的區別是第一個只是記錄了線程計數器的個數,而第二個不僅記錄了計數器,當屏障放開時,會執行第二個參數線程的方法(第二個參數一般傳入的是一個實現了Runnable的線程方法)
再來結合代碼深入了解一下:
1 public class UseCyclicBarrier { 2 3 private static CyclicBarrier barrier 4 = new CyclicBarrier(5,new CollectThread()); 5 6 private static ConcurrentHashMap<String,Long> resultMap 7 = new ConcurrentHashMap<>();//存放子線程工作結果的容器 8 9 public static void main(String[] args) { 10 new Thread(){ 11 @Override 12 public void run() { 13 long id = Thread.currentThread().getId();//線程本身的處理結果 14 resultMap.put(Thread.currentThread().getId()+"",id); 15 Random r = new Random();//隨機決定工作線程的是否睡眠 16 try { 17 if(r.nextBoolean()) { 18 Thread.sleep(2000+id); 19 System.out.println("Thread_"+id+" ....do something "); 20 } 21 System.out.println(id+"....is await"); 22 barrier.await(); 23 Thread.sleep(1000+id); 24 System.out.println("Thread_"+id+" ....do its business "); 25 } catch (Exception e) { 26 e.printStackTrace(); 27 } 28 } 29 }.start(); 30 for(int i=0;i<=3;i++){ 31 Thread thread = new Thread(new SubThread()); 32 thread.start(); 33 } 34 } 35 36 //負責屏障開放以後的工作 37 private static class CollectThread implements Runnable{ 38 39 @Override 40 public void run() { 41 StringBuilder result = new StringBuilder(); 42 for(Map.Entry<String,Long> workResult:resultMap.entrySet()){ 43 result.append("["+workResult.getValue()+"]"); 44 } 45 System.out.println(" the result = "+ result); 46 System.out.println("do other business........"); 47 } 48 } 49 50 //工作線程 51 private static class SubThread implements Runnable{ 52 53 @Override 54 public void run() { 55 long id = Thread.currentThread().getId();//線程本身的處理結果 56 resultMap.put(Thread.currentThread().getId()+"",id); 57 Random r = new Random();//隨機決定工作線程的是否睡眠 58 try { 59 if(r.nextBoolean()) { 60 Thread.sleep(2000+id); 61 System.out.println("Thread_"+id+" ....do something "); 62 } 63 System.out.println(id+"....is await"); 64 barrier.await(); 65 Thread.sleep(1000+id); 66 System.out.println("Thread_"+id+" ....do its business "); 67 } catch (Exception e) { 68 e.printStackTrace(); 69 } 70 } 71 } 72 }
輸出結果: 11....is await
12....is await
Thread_10 ....do something
10....is await
Thread_13 ....do something
13....is await
Thread_14 ....do something
14....is await
the result = [11][12][13][14][10]
do other business........
Thread_10 ....do its business
Thread_11 ....do its business
Thread_12 ....do its business
Thread_13 ....do its business
Thread_14 ....do its business
3.countDownLatch和cyclicBarrier區別
(1)countDownlatch的計數器由調用countDown方法次數決定,每次調用計數器減一,可以在一個線程中調用多次,而cyclicBarrier計數器取決調用await方法的線程個數。
(2) countDownLatch只能用一次,而cyclicBarrier可以循環使用。並且cyclicBarrier可以調用reset方法重置計數器,可以在線程故障重新啟用線程調用。
(3) 二者在內部方法也有很多區別,具體有興趣的可以去看看源碼。
並發編程常用工具類之countDownLatch和cyclicBarrier的使用對比