java.util.concurrent包下的幾個常用類
1.Callable<V>
Callable<V>與Runnable類似,理解Callable<V>可以從比較其與Runnable的區別開始:
1)從使用上:實現的Callable<V>的類需要實現call()方法,此方法有返回物件V;而Runnable的子類需要實現run()方法,但沒有返回值;
2)如果直接呼叫Callable<V>的子類的call()方法,程式碼是同步順序執行的;而Runnable的子類是執行緒,是程式碼非同步執行。
3)將Callable子類submit()給執行緒池去執行,那麼在時間上幾個Callable的子類的執行是非同步的。
即:如果一個Callable執行需要5s,那麼直接呼叫Callable.call(),執行3次需要15s;
而將Callable子類交個執行緒執行3次,在池可用的情況下,只需要5s。這就是基本的將任務拆分非同步執行的做法。
4)callable與future的組合用法:
(什麼是Future?Future 表示非同步計算的結果。其用於獲取執行緒池執行callable後的結果,這個結果封裝為Future類。詳細可以參看Future的API,有示例。)
一種就像上面所說,對一個大任務進行分制處理;
另一種就是對一個任務的多種實現方法共同執行,任何一個返回計算結果,則其他的任務就沒有執行的必要。選取耗時最少的結果執行。
2.Semaphore
一個計數訊號量,主要用於控制多執行緒對共同資源庫訪問的限制。
典型的例項:1)公共廁所的蹲位……,10人等待5個蹲位的測試,滿員後就只能出一個進一個。
2)地下車位,要有空餘才能放行
3)共享檔案IO數等
與執行緒池的區別:執行緒池是控制執行緒的數量,訊號量是控制共享資源的併發訪問量。
例項:Semaphore avialable = new Semaphore(int x,boolean y);
x:可用資源數;y:公平競爭或非公平競爭(公平競爭會導致排隊,等待最久的執行緒先獲取資源)
用法:在獲取工作資源前,用Semaphore.acquire()獲取資源,如果資源不可用則阻塞,直到獲取資源;操作完後,用Semaphore.release()歸還資源
程式碼示例:(具體管理資源池的示例,可以參考API的示例)
- public class SemaphoreTest {
- private static final int NUMBER = 5; //限制資源訪問數
- private static final Semaphore avialable = new Semaphore(NUMBER,true);
- public static void main(String[] args) {
- ExecutorService pool = Executors.newCachedThreadPool();
- Runnable r = new
- public void run(){
- try {
- avialable.acquire(); //此方法阻塞
- Thread.sleep(10*1000);
- System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--執行完畢");
- avialable.release();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- System.out.println(avialable.availablePermits());
- for(int i=0;i<10;i++){
- pool.execute(r);
- }
- System.out.println(avialable.availablePermits());
- pool.shutdown();
- }
- public static String getNow(){
- SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
- return sdf.format(new Date());
- }
- }
3.ReentrantLock與Condition
1.ReentrantLock:可重入互斥鎖。使用上與synchronized關鍵字對比理解:
1.1)synchronized示例:
- synchronized(object){
- //do process to object
- }
1.2)ReentrantLock示例:(api)
- private final ReentrantLock lock = new ReentrantLock();
- public void m() {
- lock.lock(); // block until condition holds
- try {
- // ... method body
- } finally {
- lock.unlock()
- }
- }
由1.1)和1.2)的示例很好理解,ReetantLock也就是一個鎖,執行緒執行某段程式碼時,需要爭用此類例項的鎖,用完後要顯示的釋放此鎖。
至於具體區別,後面在說……
2.Condition:此類是同步的條件物件,每個Condition例項繫結到一個ReetrantLock中,以便爭用同一個鎖的多執行緒之間可以通過Condition的狀態來獲取通知。
注意:使用Condition前,首先要獲得ReentantLock,當條件不滿足執行緒1等待時,ReentrantLock會被釋放,以能讓其他執行緒爭用,其他執行緒獲得reentrantLock,然後滿足條件,喚醒執行緒1繼續執行。
這與wait()方法是一樣的,呼叫wait()的程式碼塊要被包含在synchronized塊中,而當執行緒r1呼叫了objectA.wait()方法後,同步物件的鎖會釋放,以能讓其他執行緒爭用;其他執行緒獲取同步物件鎖,完成任務,呼叫objectA.notify(),讓r1繼續執行。程式碼示例如下。
程式碼示例1(呼叫condition.await();會釋放lock鎖):
- public class ConditionTest {
- private static final ReentrantLock lock = new ReentrantLock(true);
- //從鎖中建立一個繫結條件
- private static final Condition condition = lock.newCondition();
- private static int count = 1;
- public static void main(String[] args) {
- Runnable r1 = new Runnable(){
- public void run(){
- lock.lock();
- try{
- while(count<=5){
- System.out.println(Thread.currentThread().getName()+"--"+count++);
- Thread.sleep(1000);
- }
- condition.signal(); //執行緒r1釋放條件訊號,以喚醒r2中處於await的程式碼繼續執行。
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
- };
- Runnable r2 = new Runnable(){
- public void run(){
- lock.lock();
- try{
- if(count<=5){
- System.out.println("----$$$---");
- condition.await(); //但呼叫await()後,lock鎖會被釋放,讓執行緒r1能獲取到,與Object.wait()方法一樣
- System.out.println("----------");
- }
- while(count<=10){
- System.out.println(Thread.currentThread().getName()+"--"+count++);
- Thread.sleep(1000);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }finally{
- lock.unlock();
- }
- }
- };
- new Thread(r2).start(); //讓r2先執行,先獲得lock鎖,但條件不滿足,讓r2等待await。
- try {
- Thread.sleep(100); //這裡休眠主要是用於測試r2.await()會釋放lock鎖,被r1獲取
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- new Thread(r1).start();
- }
- }
程式碼示例2(此例子來自於Condition的API):
- public class ConditionMain {
- public static void main(String[] args) {
- final BoundleBuffer buf = new ConditionMain().new BoundleBuffer();
- new Thread(new Runnable(){
- public void run() {
- for(int i=0;i<1000;i++){
- try {
- buf.put(i);
- System.out.println("入值:"+i);
- Thread.sleep(200);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- new Thread(new Runnable(){
- public void run() {
- for(int i=0;i<1000;i++){
- try {
- int x = buf.take();
- System.out.println("出值:"+x);
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- }
- }).start();
- }
- public class BoundleBuffer {
- final Lock lock = new ReentrantLock();
- final Condition notFull = lock.newCondition();
- final Condition notEmpty = lock.newCondition();
- final Integer[] items = new Integer[10];
- int putptr, takeptr, count;
- public void put(int x) throws InterruptedException {
- System .out.println("put wait lock");
- lock.lock();
- System .out.println("put get lock");
- try {
- while (count == items.length){
- System.out.println("buffer full, please wait");
- notFull.await();
- }
- items[putptr] = x;
- if (++putptr == items.length)
- putptr = 0;
- ++count;
- notEmpty.signal();
- } finally {
- lock.unlock();
- }
- }
- public int take() throws InterruptedException {
- System .out.println("take wait lock");
- lock.lock();
- System .out.println("take get lock");
- try {
- while (count == 0){
- System.out.println("no elements, please wait");
- notEmpty.await();
- }
- int x = items[takeptr];
- if (++takeptr == items.length)
- takeptr = 0;
- --count;
- notFull.signal();
- return x;
- } finally {
- lock.unlock();
- }
- }
- }
- }
4.BlockingQueue
簡單介紹。這是一個阻塞的佇列超類介面,concurrent包下很多架構都基於這個佇列。BlockingQueue是一個介面,此介面的實現類有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每個類的具體使用可以參考API。
這些實現類都遵從共同的介面定義(一目瞭然,具體參考api):
- 丟擲異常 特殊值 阻塞 超時
- 插入 add(e) offer(e) put(e) offer(e, time, unit)
- 移除 remove() poll() take() poll(time, unit)
- 檢查 element() peek() 不可用 不可用
5.CompletionService
1.CompletionService是一個介面,用來儲存一組非同步求解的任務結果集。api的解釋是:將新生產的非同步任務與已完成的任務結果集分離開來。
2.CompletionService依賴於一個特定的Executor來執行任務。實際就是此介面需要多執行緒處理一個共同的任務,這些多執行緒由一個指定的執行緒池來管理。CompletionService的實現類ExecutorCompletionService。
3.api的官方程式碼示例參考ExecutorCompletionService類的api(一個通用分制概念的函式)。
4.使用示例:如有時我們需要一次插入大批量資料,那麼可能我們需要將1w條資料分開插,非同步執行。如果某個非同步任務失敗那麼我們還要重插,那可以用CompletionService來實現。下面是簡單程式碼:
(程式碼中1w條資料分成10份,每次插1000條,如果成功則返回true,如果失敗則返回false。那麼忽略資料庫的東西,我們假設插1w條資料需10s,插1k條資料需1s,那麼下面的程式碼分制後,插入10條資料需要2s。為什麼是2s呢?因為我們開的執行緒池是8執行緒,10個非同步任務就有兩個需要等待池資源,所以是2s,如果將下面的8改為10,則只需要1s。)
- public class CompletionServiceTest {
- public static void main(String[] args) {
- ExecutorService pool = Executors.newFixedThreadPool(8); //需要2s,如果將8改成10,則只需要1s
- CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(pool);
- Callable<Boolean> task = new Callable<Boolean>(){
- public Boolean call(){
- try {
- Thread.sleep(1000);
- System.out.println("插入1000條資料完成");
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return true;
- };
- };
- System.out.println(getNow()+"--開始插入資料");
- for(int i=0;i<10;i++){
- cs.submit(task);
- }
- for(int i=0;i<10;i++){
- try {
- //ExecutorCompletionService.take()方法是阻塞的,如果當前沒有完成的任務則阻塞
- System.out.println(cs.take().get());
- //實際使用時,take()方法獲取的結果可用於處理,如果插入失敗,則可以進行重試或記錄等操作
- } catch (InterruptedException|ExecutionException e) {
- e.printStackTrace();
- }
- }
- System.out.println(getNow()+"--插入資料完成");
- pool.shutdown();
- }
- public static String getNow(){
- SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
- return sdf.format(new Date());
- }
- }
5.CompletionService與Callable<V>+Future的對比:
在上面的Callable中說過,Callable+Future能實現任務的分治,但是有個問題就是:不知道call()什麼時候完成,需要人為控制等待。
而jdk通過CompetionService已經將此麻煩簡化,通過CompletionService將非同步任務完成的與未完成的區分開來(正如api的描述),我們只用去取即可。
CompletionService有什麼好處呢?
如上所說:1)將已完成的任務和未完成的任務分開了,無需開發者操心;2)隱藏了Future類,簡化了程式碼的使用。真想點個贊!
6.CountDownLatch
1.CountDownLatch:api解釋:一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。個人理解是CountDownLatch讓可以讓一組執行緒同時執行,然後在這組執行緒全部執行完前,可以讓另一個執行緒等待。
就好像跑步比賽,10個選手依次就位,哨聲響才同時出發;所有選手都通過終點,才能公佈成績。那麼CountDownLatch就可以控制10個選手同時出發,和公佈成績的時間。
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過呼叫 countDown() 的執行緒開啟入口前,所有呼叫 await 的執行緒都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個執行緒在 N 個執行緒完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);
程式碼示例可參考api的示例。(重要)
2.程式碼示例:
個人示例:
- public class CountDownLatchTest {
- private static SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
- public static void main(String[] args) {
- final CountDownLatch start = new CountDownLatch(1); //用一個訊號控制一組執行緒的開始,初始化為1
- final CountDownLatch end = new CountDownLatch(10); //要等待N個執行緒的結束,初始化為N,這裡是10
- Runnable r = new Runnable(){
- public void run(){
- try {
- start.await(); //阻塞,這樣start.countDown()到0,所有阻塞在start.await()處的執行緒一起執行
- Thread.sleep((long) (Math.random()*10000));
- System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--執行完成");
- end.countDown();//非阻塞,每個執行緒執行完,讓end--,這樣10個執行緒執行完end倒數到0,主執行緒的end.await()就可以繼續執行
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- }
- };
- for(int i=0;i<10;i++){
- new Thread(r).start(); //雖然開始了10個執行緒,但所有執行緒都阻塞在start.await()處
- }
- System.out.println(getNow()+"--執行緒全部啟動完畢,休眠3s再讓10個執行緒一起執行");
- try {
- Thread.sleep(3*1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(getNow()+"--開始");
- start.countDown(); //start初始值為1,countDown()變成0,觸發10個執行緒一起執行
- try {
- end.await(); //阻塞,等10個執行緒都執行完了才繼續往下。
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(getNow()+"--10個執行緒都執行完了,主執行緒繼續往下執行!");
- }
- private static String getNow(){
- return sdf.format(new Date());
- }
- }
7.CyclicBarrier
1.一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點。也就是說,這一組執行緒的執行分幾個節點,每個節點往下執行,都需等待其他執行緒,這就需要這種等待具有迴圈性。CyclicBarrier在這樣的情況下就很有用。
2.CyclicBarrier與CountDownLacth的區別:
1)CountDownLacth用於一個執行緒與一組執行緒之間的相互等待。常用的就是一個主執行緒與一組分治執行緒之間的等待:主執行緒發號令,一組執行緒同時執行;一組執行緒依次執行完,再喚醒主執行緒繼續執行;
CyclicBarrier用於一組執行緒執行時,每個執行緒執行有多個節點,每個節點的處理需要相互等待。如:對5個檔案進行處理,按行將各個檔案數字挑出來合併成一行,排序,並輸出到另一個檔案,那每次處理都需要等待5個執行緒讀入下一行。(api示例可供參考)
2)CountDownLacth的處理機制是:初始化一個值N(相當於一組執行緒有N個),每個執行緒呼叫一次countDown(),那麼cdLatch減1,等所有執行緒都呼叫過countDown(),那麼cdLatch值達到0,那麼執行緒從await()處接著玩下執行。
CyclicBarrier的處理機制是:初始化一個值N(相當於一組執行緒有N個),每個執行緒呼叫一次await(),那麼barrier加1,等所有執行緒都呼叫過await(),那麼barrier值達到初始值N,所有執行緒接著往下執行,並將barrier值重置為0,再次迴圈下一個屏障;
3)由2)可以知道,CountDownLatch只可以使用一次,而CyclicBarrier是可以迴圈使用的。
3.個人用於理解的示例:
- public class CyclicBarrierTest {
- private static final CyclicBarrier barrier = new CyclicBarrier(5,
- new Runnable(){
- public void run(){ //每次執行緒到達屏障點,此方法都會執行
- System.out.println("\n--------barrier action--------\n");
- }
- });
- public static void main(String[] args) {
- for(int i=0;i<5;i++){
- new Thread(new CyclicBarrierTest().new Worker()).start();
- }
- }
- class Worker implements Runnable{
- public void run(){
- try {
- System.out.println(Thread.currentThread().getName()+"--第一階段");
- Thread.sleep(getRl());
- barrier.await(); //每一次await()都會阻塞,等5個執行緒都執行到這一步(相當於barrier++操作,加到初始化值5),才繼續往下執行
- System.out.println(Thread.currentThread().getName()+"--第二階段");
- Thread.sleep(getRl());
- barrier.await(); //每一次5個執行緒都到達共同的屏障節點,會執行barrier初始化引數中定義的Runnable.run()
- System.out.println(Thread.currentThread().getName()+"--第三階段");
- Thread.sleep(getRl());
- barrier.await();
- System.out.println(Thread.currentThread().getName()+"--第四階段");
- Thread.sleep(getRl());
- barrier.await();
- System.out.println(Thread.currentThread().getName()+"--第五階段");
- Thread.sleep(getRl());
- barrier.await();
- System.out.println(Thread.currentThread().getName()+"--結束");
- } catch (InterruptedException | BrokenBarrierException e) {
- e.printStackTrace();
- }
- }
- }
- public static long getRl(){
- return Math.round(10000);
- }
- }
4.參考api的示例。
api的示例自己看,就是加深印象。
但是api中有一點描述:如果屏障操作在執行時不依賴於正掛起的執行緒,則執行緒組中的任何執行緒在獲得釋放時都能執行該操作。為方便此操作,每次呼叫 await() 都將返回能到達屏障處的執行緒的索引。然後,您可以選擇哪個執行緒應該執行屏障操作,例如:
- if (barrier.await() == 0) {
- <span style="white-space:pre"> </span> // log the completion of this iteration
- }
就是說,barrier.await()還會返回一個int值。這個返回值到底是什麼呢?不是返回的執行緒的索引,返回的是:N-進入等待執行緒數,如5個執行緒,5執行緒都進入等待,那返回值就是0(具體可以參看原始碼)。那麼barrier.await()==0也可以看做是一個N執行緒都達到公共屏障的訊號,然後在此條件下處理原本需要放在Runnable引數中的邏輯。不用擔心多執行緒會多次執行此邏輯,N個執行緒只有一個執行緒barrier.await()==0。
8.Exchanger
1.Exchanger可以在對中對元素進行配對和交換的執行緒的同步點。api上不是太好理解,個人理解說白了就是兩個執行緒交換各自使用的指定記憶體資料。
2.場景:
api中有示例,兩個執行緒A、B,各自有一個數據型別相同的變數a、b,A執行緒往a中填資料(生產),B執行緒從b中取資料(消費)。具體如何讓a、b在記憶體發生關聯,就由Exchanger完成。
api中說:Exchanger 可能被視為 SynchronousQueue 的雙向形式。怎麼理解呢?傳統的SynchronousQueue存取需要同步,就是A放入需要等待B取出,B取出需要等待A放入,在時間上要同步進行。而Exchanger在B取出的時候,A是同步在放入的。即:1)A放入a,a滿,然後與B交換記憶體,那A就可以操作b(b空),而B可以操作a;2)等b被A存滿,a被B用完,再交換;3)那A又填充a,B又消費b,依次迴圈。兩個記憶體在一定程度上是同時被操作的,在時間上不需要同步。
再理解就是:如果生產需要5s,消費需要5s。SynchronousQueue一次存取需要10s,而Exchanger只需要5s。4.注意事項:
目前只知道Exchanger只能發生在兩個執行緒之間。但實際上Exchanger的原始碼是有多個插槽(Slot),交換是通過執行緒ID的hash值來定位的。目前還沒搞懂?待後續。
如果一組執行緒aGroup操作a記憶體,一組執行緒bGroup操作b記憶體,如何交換?能不能交換?
3.程式碼示例:
- public class ExchangerTest {
- private SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");
- private static Exchanger<Queue<Integer>> changer = new Exchanger<Queue<Integer>>();
- public static void main(String[] args) {
- new Thread(new ExchangerTest().new ProducerLoop()).start();
- new Thread(new ExchangerTest().new ConsumerLoop()).start();
- }
- class ProducerLoop implements Runnable{
- private Queue<Integer> pBuffer = new LinkedBlockingQueue<Integer>();
- private final int maxnum = 10;
- @Override
- public void run() {
- try{
- for(;;){
- Thread.sleep(500);
- pBuffer.offer((int) Math.round(Math.random()*100));
- if(pBuffer.size() == maxnum){
- System.out.println(getNow()+"--producer交換前");
- pBuffer = changer.exchange(pBuffer);
- System.out.println(getNow()+"--producer交換後");
- }
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
- class ConsumerLoop implements Runnable{
- private Queue<Integer> cBuffer = new LinkedBlockingQueue<Integer>();
- @Override
- public void run() {
- try{
- for(;;){
- if(cBuffer.size() == 0){
- System.out.println("\n"+getNow()+"--consumer交換前");
- cBuffer = changer.exchange(cBuffer);
- System.out.println(getNow()+"--consumer交換後");
- }
- System.out.print(cBuffer.poll()+" ");
- Thread.sleep(500);
- }
- }catch(Exception e){
- e.printStackTrace();
- }
- }
- }
- private String getNow(){
- return sdf.format(new Date());
- }
- }
4.注意事項:
目前只知道Exchanger只能發生在兩個執行緒之間。但實際上Exchanger的原始碼是有多個插槽(Slot),交換是通過執行緒ID的hash值來定位的。目前還沒搞懂?待後續。
如果一組執行緒aGroup操作a記憶體,一組執行緒bGroup操作b記憶體,如何交換?能不能交換?
9.Phaser
Phaser是jdk1.7的新特性。其功能類似CyclicBarrier和CountDownLatch,但其功能更靈活,更強大,支援動態調整需要控制的執行緒數。不重複了。參考連結: