Java學習筆記—多線程(並發工具類,java.util.concurrent.atomic包)
在JDK的並發包裏提供了幾個非常有用的並發工具類。CountDownLatch、CyclicBarrier和Semaphore工具類提供了一種並發流程控制的手段,Exchanger工具類則提供了在線程間交換數據的一種手段。本章會配合一些應用場景來介紹如何使用這些工具類。
CountDownLatch
CountDownLatch允許一個或多個線程等待其他線程完成操作。
假如有這樣一個需求:我們需要解析一個Excel裏多個sheet的數據,此時可以考慮使用多線程,每個線程解析一個sheet裏的數據,等到所有的sheet都解析完之後,程序需要提示解析完成(或者匯總結果)。在這個需求中,要實現主線程等待所有線程完成sheet的解析操作,最簡單的做法是使用join()方法。
import java.util.Random; import java.util.concurrent.atomic.AtomicInteger; public class JoinCountDownLatchTest { private static Random sr=new Random(47); private static AtomicInteger result=new AtomicInteger(0); private static int threadCount=10; private static class Parser implementsRunnable{ String name; public Parser(String name){ this.name=name; } @Override public void run() { int sum=0; int seed=Math.abs(sr.nextInt()) ; Random r=new Random(47); for(int i=0;i<100;i++){ sum+=r.nextInt(seed); } result.addAndGet(sum); System.out.println(name+"線程的解析結果:"+sum); } } public static void main(String[] args) throws InterruptedException { Thread[] threads=new Thread[threadCount]; for(int i=0;i<threadCount;i++){ threads[i]=new Thread(new Parser("Parser-"+i)); } for(int i=0;i<threadCount;i++){ threads[i].start(); } for(int i=0;i<threadCount;i++){ threads[i].join(); } System.out.println("所有線程解析結束!"); System.out.println("所有線程的解析結果:"+result); } }
輸出:
Parser-1線程的解析結果:-2013585201 Parser-0線程的解析結果:1336321192 Parser-2線程的解析結果:908136818 Parser-5線程的解析結果:-1675827227 Parser-3線程的解析結果:1638121055 Parser-4線程的解析結果:1513365118 Parser-6線程的解析結果:489607354 Parser-8線程的解析結果:1513365118 Parser-7線程的解析結果:-1191966831 Parser-9線程的解析結果:-912399159 所有線程解析結束! 所有線程的解析結果:1605138237
join用於讓當前執行線程等待join線程執行結束。其實現原理是不停檢查join線程是否存活,如果join線程存活則讓當前線程永遠等待。
在JDK 1.5之後的並發包中提供的CountDownLatch也可以實現join的功能,並且比join的功能更多。
import java.util.Random; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; public class CountDownLatchTest { private static Random sr=new Random(47); private static AtomicInteger result=new AtomicInteger(0); private static int threadCount=10;//線程數量 private static CountDownLatch countDown=new CountDownLatch(threadCount);//CountDownLatch private static class Parser implements Runnable{ String name; public Parser(String name){ this.name=name; } @Override public void run() { int sum=0; int seed=Math.abs(sr.nextInt()) ; Random r=new Random(47); for(int i=0;i<100;i++){ sum+=r.nextInt(seed); } result.addAndGet(sum); System.out.println(name+"線程的解析結果:"+sum); countDown.countDown();//註意這裏 } } public static void main(String[] args) throws InterruptedException { Thread[] threads=new Thread[threadCount]; for(int i=0;i<threadCount;i++){ threads[i]=new Thread(new Parser("Parser-"+i)); } for(int i=0;i<threadCount;i++){ threads[i].start(); } /* for(int i=0;i<threadCount;i++){ threads[i].join(); }*/ countDown.await();//將join改為使用CountDownLatch System.out.println("所有線程解析結束!"); System.out.println("所有線程的解析結果:"+result); } }
輸出:
Parser-0線程的解析結果:1336321192 Parser-1線程的解析結果:-2013585201 Parser-2線程的解析結果:-1675827227 Parser-4線程的解析結果:1638121055 Parser-3線程的解析結果:908136818 Parser-5線程的解析結果:1513365118 Parser-7線程的解析結果:489607354 Parser-6線程的解析結果:1513365118 Parser-8線程的解析結果:-1191966831 Parser-9線程的解析結果:-912399159 所有線程解析結束! 所有線程的解析結果:1605138237
CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,這裏就傳入N。
當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。由於countDown方法可以用在任何地方,所以這裏說的N個點,可以是N個線程,也可以是1個線程裏的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程裏即可。
如果有某個解析sheet的線程處理得比較慢,我們不可能讓主線程一直等待,所以可以使用另外一個帶指定時間的await方法——await(long time,TimeUnit unit),這個方法等待特定時間後,就會不再阻塞當前線程。join也有類似的方法。
註意:計數器必須大於等於0,只是等於0時候,計數器就是零,調用await方法時不會阻塞當前線程。CountDownLatch不可能重新初始化或者修改CountDownLatch對象的內部計數器的值。一個線程調用countDown方法happen-before,另外一個線程調用await方法。
CyclicBarrier
CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最後一個線程到達屏障時,屏障才會
開門,所有被屏障攔截的線程才會繼續運行。當所有等待線程都被釋放以後,CyclicBarrier可以被重用。CyclicBarrier類位於java.util.concurrent包下,CyclicBarrier提供2個構造器:
public CyclicBarrier(int parties, Runnable barrierAction) { } public CyclicBarrier(int parties) { }
參數parties指讓多少個線程或者任務等待至barrier狀態;參數barrierAction為當這些線程都達到barrier狀態時會執行的內容
public class Test { public static void main(String[] args) { int N = 4; CyclicBarrier barrier = new CyclicBarrier(N); for(int i=0;i<N;i++) new Writer(barrier).start(); } static class Writer extends Thread{ private CyclicBarrier cyclicBarrier; public Writer(CyclicBarrier cyclicBarrier) { this.cyclicBarrier = cyclicBarrier; } @Override public void run() { System.out.println("線程"+Thread.currentThread().getName()+"正在寫入數據..."); try { Thread.sleep(5000); //以睡眠來模擬寫入數據操作 System.out.println("線程"+Thread.currentThread().getName()+"寫入數據完畢,等待其他線程寫入完畢"); cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); }catch(BrokenBarrierException e){ e.printStackTrace(); } System.out.println("所有線程寫入完畢,繼續處理其他任務..."); } } }
執行結果:
線程Thread-0正在寫入數據... 線程Thread-3正在寫入數據... 線程Thread-2正在寫入數據... 線程Thread-1正在寫入數據... 線程Thread-2寫入數據完畢,等待其他線程寫入完畢 線程Thread-0寫入數據完畢,等待其他線程寫入完畢 線程Thread-3寫入數據完畢,等待其他線程寫入完畢 線程Thread-1寫入數據完畢,等待其他線程寫入完畢 所有線程寫入完畢,繼續處理其他任務... 所有線程寫入完畢,繼續處理其他任務... 所有線程寫入完畢,繼續處理其他任務... 所有線程寫入完畢,繼續處理其他任務...
CyclicBarrier和CountDownLatch的區別
CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置。所以CyclicBarrier能處理更為復雜的業務場景。例如,如果計算發生錯誤,可以重置計數器,並讓線程重新執行一次。
CyclicBarrier還提供其他有用的方法,比如getNumberWaiting方法可以獲得Cyclic-Barrier阻塞的線程數量。isBroken()方法用來了解阻塞的線程是否被中斷。
Semaphore
Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。Semaphore可以控同時訪問的線程個數,通過acquire()獲取一個許可,如果沒有就等待,而 release() 釋放一個許可。
假若一個工廠有5臺機器,但是有8個工人,一臺機器同時只能被一個工人使用,只有使用完了,其他工人才能繼續使用。那麽我們就可以通過Semaphore來實現:
public class Test { public static void main(String[] args) { int N = 8; //工人數 Semaphore semaphore = new Semaphore(5); //機器數目 for(int i=0;i<N;i++) new Worker(i,semaphore).start(); } static class Worker extends Thread{ private int num; private Semaphore semaphore; public Worker(int num,Semaphore semaphore){ this.num = num; this.semaphore = semaphore; } @Override public void run() { try { semaphore.acquire(); System.out.println("工人"+this.num+"占用一個機器在生產..."); Thread.sleep(2000); System.out.println("工人"+this.num+"釋放出機器"); semaphore.release(); } catch (InterruptedException e) { e.printStackTrace(); } } } }
執行結果:
工人0占用一個機器在生產...
工人1占用一個機器在生產...
工人2占用一個機器在生產...
工人4占用一個機器在生產...
工人5占用一個機器在生產...
工人0釋放出機器
工人2釋放出機器
工人3占用一個機器在生產...
工人7占用一個機器在生產...
工人4釋放出機器
工人5釋放出機器
工人1釋放出機器
工人6占用一個機器在生產...
工人3釋放出機器
工人7釋放出機器
工人6釋放出機器
Exchanger
Exchanger(交換者)是一個用於線程間協作的工具類。Exchanger用於進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。
下面來看一下Exchanger的應用場景。
1、Exchanger可以用於遺傳算法,遺傳算法裏需要選出兩個人作為交配對象,這時候會交換兩人的數據,並使用交叉規則得出2個交配結果。
2、Exchanger也可以用於校對工作,比如我們需要將紙制銀行流水通過人工的方式錄入成電子銀行流水,為了避免錯誤,采用AB崗兩人進行錄入,錄入到Excel之後,系統需要加載這兩個Excel,並對兩個Excel數據進行校對,看看是否錄入一致.
import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<String>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { String A = "銀行流水100";// A錄入銀行流水數據 String B=exgr.exchange(A); System.out.println("A的視角:A和B數據是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B); } catch (InterruptedException e) { } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "銀行流水200";// B錄入銀行流水數據 String A = exgr.exchange(B); System.out.println("B的視角:A和B數據是否一致:" + A.equals(B) + ",A錄入的是:" + A + ",B錄入是:" + B); } catch (InterruptedException e) { } } }); threadPool.shutdown(); } }
輸出:
B的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200 A的視角:A和B數據是否一致:false,A錄入的是:銀行流水100,B錄入是:銀行流水200
如果兩個線程有一個沒有執行exchange()方法,則會一直等待,如果擔心有特殊情況發生,避免一直等待,可以使用exchange(V x,longtimeout,TimeUnit unit)設置最大等待時長。
參考:
Java並發工具類詳解
Java並發編程-原子類及並發工具類
Java學習筆記—多線程(並發工具類,java.util.concurrent.atomic包)