1. 程式人生 > >java.util.concurrent包下的幾個常用類

java.util.concurrent包下的幾個常用類

1.Callable<V>

Callable<V>與Runnable類似,理解Callable<V>可以從比較其與Runnable的區別開始:

1)從使用上:實現的Callable<V>的類需要實現call()方法,此方法有返回物件V;而Runnable的子類需要實現run()方法,但沒有返回值;
2)如果直接呼叫Callable<V>的子類的call()方法,程式碼是同步順序執行的;而Runnable的子類是執行緒,是程式碼非同步執行。
3)將Callable子類submit()給執行緒池去執行,那麼在時間上幾個Callable的子類的執行是非同步的。
即:如果一個Callable執行需要5s,那麼直接呼叫Callable.call(),執行3次需要15s;
而將Callable子類交個執行緒執行3次,在池可用的情況下,只需要5s。這就是基本的將任務拆分非同步執行的做法。
4)callable與future的組合用法:
(什麼是Future?Future 表示非同步計算的結果。其用於獲取執行緒池執行callable後的結果,這個結果封裝為Future類。詳細可以參看Future的API,有示例。)
一種就像上面所說,對一個大任務進行分制處理;
另一種就是對一個任務的多種實現方法共同執行,任何一個返回計算結果,則其他的任務就沒有執行的必要。選取耗時最少的結果執行。

2.Semaphore

一個計數訊號量,主要用於控制多執行緒對共同資源庫訪問的限制。
典型的例項:1)公共廁所的蹲位……,10人等待5個蹲位的測試,滿員後就只能出一個進一個。
2)地下車位,要有空餘才能放行
3)共享檔案IO數等
與執行緒池的區別:執行緒池是控制執行緒的數量,訊號量是控制共享資源的併發訪問量。
例項:Semaphore avialable = new Semaphore(int x,boolean y);
x:可用資源數;y:公平競爭或非公平競爭(公平競爭會導致排隊,等待最久的執行緒先獲取資源)
用法:在獲取工作資源前,用Semaphore.acquire()獲取資源,如果資源不可用則阻塞,直到獲取資源;操作完後,用Semaphore.release()歸還資源
程式碼示例:(具體管理資源池的示例,可以參考API的示例)

  1. public class SemaphoreTest {  
  2.     private static final int NUMBER = 5;    //限制資源訪問數  
  3.     private static final Semaphore avialable = new Semaphore(NUMBER,true);  
  4.     public static void main(String[] args) {  
  5.         ExecutorService pool = Executors.newCachedThreadPool();  
  6.         Runnable r = new
     Runnable(){  
  7.             public void run(){  
  8.                 try {  
  9.                     avialable.acquire();    //此方法阻塞  
  10.                     Thread.sleep(10*1000);  
  11.                     System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--執行完畢");  
  12.                     avialable.release();  
  13.                 } catch (InterruptedException e) {  
  14.                     e.printStackTrace();  
  15.                 }  
  16.             }  
  17.         };  
  18.         System.out.println(avialable.availablePermits());  
  19.         for(int i=0;i<10;i++){  
  20.             pool.execute(r);  
  21.         }  
  22.         System.out.println(avialable.availablePermits());  
  23.         pool.shutdown();  
  24.     }  
  25.     public static String getNow(){  
  26.         SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  27.         return sdf.format(new Date());  
  28.     }  
  29. }  

3.ReentrantLock與Condition

1.ReentrantLock:可重入互斥鎖。使用上與synchronized關鍵字對比理解:
1.1)synchronized示例:

  1. synchronized(object){  
  2.         //do process to object  
  3.     }  


1.2)ReentrantLock示例:(api)

  1. private final ReentrantLock lock = new ReentrantLock();  
  2.    public void m() {   
  3.      lock.lock();  // block until condition holds  
  4.      try {  
  5.        // ... method body  
  6.      } finally {  
  7.        lock.unlock()  
  8.      }  
  9.    }  


由1.1)和1.2)的示例很好理解,ReetantLock也就是一個鎖,執行緒執行某段程式碼時,需要爭用此類例項的鎖,用完後要顯示的釋放此鎖。
至於具體區別,後面在說……
2.Condition:此類是同步的條件物件,每個Condition例項繫結到一個ReetrantLock中,以便爭用同一個鎖的多執行緒之間可以通過Condition的狀態來獲取通知。
注意:使用Condition前,首先要獲得ReentantLock,當條件不滿足執行緒1等待時,ReentrantLock會被釋放,以能讓其他執行緒爭用,其他執行緒獲得reentrantLock,然後滿足條件,喚醒執行緒1繼續執行。
這與wait()方法是一樣的,呼叫wait()的程式碼塊要被包含在synchronized塊中,而當執行緒r1呼叫了objectA.wait()方法後,同步物件的鎖會釋放,以能讓其他執行緒爭用;其他執行緒獲取同步物件鎖,完成任務,呼叫objectA.notify(),讓r1繼續執行。程式碼示例如下。


程式碼示例1(呼叫condition.await();會釋放lock鎖):

  1. public class ConditionTest {  
  2.     private static final ReentrantLock lock = new ReentrantLock(true);  
  3.     //從鎖中建立一個繫結條件  
  4.     private static final Condition condition = lock.newCondition();  
  5.     private static int count = 1;  
  6.     public static void main(String[] args) {  
  7.         Runnable r1 = new Runnable(){  
  8.             public void run(){  
  9.                 lock.lock();  
  10.                 try{  
  11.                     while(count<=5){  
  12.                         System.out.println(Thread.currentThread().getName()+"--"+count++);  
  13.                         Thread.sleep(1000);  
  14.                     }  
  15.                     condition.signal();     //執行緒r1釋放條件訊號,以喚醒r2中處於await的程式碼繼續執行。  
  16.                 } catch (InterruptedException e) {  
  17.                     e.printStackTrace();  
  18.                 }finally{  
  19.                     lock.unlock();  
  20.                 }  
  21.             }  
  22.         };  
  23.         Runnable r2 = new Runnable(){  
  24.             public void run(){  
  25.                 lock.lock();  
  26.                 try{  
  27.                     if(count<=5){  
  28.                         System.out.println("----$$$---");  
  29.                         condition.await();  //但呼叫await()後,lock鎖會被釋放,讓執行緒r1能獲取到,與Object.wait()方法一樣  
  30.                         System.out.println("----------");  
  31.                     }  
  32.                     while(count<=10){  
  33.                         System.out.println(Thread.currentThread().getName()+"--"+count++);  
  34.                         Thread.sleep(1000);  
  35.                     }  
  36.                 } catch (InterruptedException e) {  
  37.                     e.printStackTrace();  
  38.                 }finally{  
  39.                     lock.unlock();  
  40.                 }  
  41.             }  
  42.         };  
  43.         new Thread(r2).start(); //讓r2先執行,先獲得lock鎖,但條件不滿足,讓r2等待await。  
  44.         try {  
  45.             Thread.sleep(100);  //這裡休眠主要是用於測試r2.await()會釋放lock鎖,被r1獲取  
  46.         } catch (InterruptedException e) {  
  47.             e.printStackTrace();  
  48.         }  
  49.         new Thread(r1).start();  
  50.     }  
  51. }  

程式碼示例2(此例子來自於Condition的API):

  1. public class ConditionMain {  
  2.     public static void main(String[] args) {  
  3.         final BoundleBuffer buf = new ConditionMain().new BoundleBuffer();  
  4.         new Thread(new Runnable(){  
  5.             public void run() {  
  6.                 for(int i=0;i<1000;i++){  
  7.                     try {  
  8.                         buf.put(i);  
  9.                         System.out.println("入值:"+i);  
  10.                         Thread.sleep(200);  
  11.                     } catch (InterruptedException e) {  
  12.                         e.printStackTrace();  
  13.                     }  
  14.                 }  
  15.             }  
  16.         }).start();  
  17.         new Thread(new Runnable(){  
  18.             public void run() {  
  19.                 for(int i=0;i<1000;i++){  
  20.                     try {  
  21.                         int x = buf.take();  
  22.                         System.out.println("出值:"+x);  
  23.                         Thread.sleep(2000);  
  24.                     } catch (InterruptedException e) {  
  25.                         e.printStackTrace();  
  26.                     }  
  27.                 }  
  28.             }  
  29.         }).start();  
  30.     }  
  31.     public class BoundleBuffer {  
  32.         final Lock lock = new ReentrantLock();  
  33.         final Condition notFull = lock.newCondition();  
  34.         final Condition notEmpty = lock.newCondition();  
  35.         final Integer[] items = new Integer[10];  
  36.         int putptr, takeptr, count;  
  37.         public void put(int x) throws InterruptedException {  
  38.             System .out.println("put wait lock");  
  39.             lock.lock();  
  40.             System .out.println("put get lock");  
  41.             try {  
  42.                 while (count == items.length){  
  43.                     System.out.println("buffer full, please wait");  
  44.                     notFull.await();  
  45.                 }  
  46.                 items[putptr] = x;  
  47.                 if (++putptr == items.length)  
  48.                     putptr = 0;  
  49.                 ++count;  
  50.                 notEmpty.signal();  
  51.             } finally {  
  52.                 lock.unlock();  
  53.             }  
  54.         }  
  55.         public int take() throws InterruptedException {  
  56.             System .out.println("take wait lock");  
  57.             lock.lock();  
  58.             System .out.println("take get lock");  
  59.             try {  
  60.                 while (count == 0){  
  61.                     System.out.println("no elements, please wait");  
  62.                     notEmpty.await();  
  63.                 }  
  64.                 int x = items[takeptr];  
  65.                 if (++takeptr == items.length)  
  66.                     takeptr = 0;  
  67.                 --count;  
  68.                 notFull.signal();  
  69.                 return x;  
  70.             } finally {  
  71.                 lock.unlock();  
  72.             }  
  73.         }  
  74.     }  
  75. }  

4.BlockingQueue

簡單介紹。這是一個阻塞的佇列超類介面,concurrent包下很多架構都基於這個佇列。BlockingQueue是一個介面,此介面的實現類有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每個類的具體使用可以參考API。

這些實現類都遵從共同的介面定義(一目瞭然,具體參考api):

  1.     丟擲異常    特殊值         阻塞      超時   
  2. 插入  add(e)      offer(e)    put(e)      offer(e, time, unit)   
  3. 移除  remove()    poll()      take()      poll(time, unit)   
  4. 檢查  element()   peek()      不可用         不可用   

5.CompletionService

1.CompletionService是一個介面,用來儲存一組非同步求解的任務結果集。api的解釋是:將新生產的非同步任務與已完成的任務結果集分離開來。
2.CompletionService依賴於一個特定的Executor來執行任務。實際就是此介面需要多執行緒處理一個共同的任務,這些多執行緒由一個指定的執行緒池來管理。CompletionService的實現類ExecutorCompletionService。
3.api的官方程式碼示例參考ExecutorCompletionService類的api(一個通用分制概念的函式)。
4.使用示例:如有時我們需要一次插入大批量資料,那麼可能我們需要將1w條資料分開插,非同步執行。如果某個非同步任務失敗那麼我們還要重插,那可以用CompletionService來實現。下面是簡單程式碼:
(程式碼中1w條資料分成10份,每次插1000條,如果成功則返回true,如果失敗則返回false。那麼忽略資料庫的東西,我們假設插1w條資料需10s,插1k條資料需1s,那麼下面的程式碼分制後,插入10條資料需要2s。為什麼是2s呢?因為我們開的執行緒池是8執行緒,10個非同步任務就有兩個需要等待池資源,所以是2s,如果將下面的8改為10,則只需要1s。)

  1. public class CompletionServiceTest {  
  2.     public static void main(String[] args) {  
  3.         ExecutorService pool = Executors.newFixedThreadPool(8);     //需要2s,如果將8改成10,則只需要1s  
  4.         CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(pool);  
  5.         Callable<Boolean> task = new Callable<Boolean>(){  
  6.             public Boolean call(){  
  7.                 try {  
  8.                     Thread.sleep(1000);  
  9.                     System.out.println("插入1000條資料完成");  
  10.                 } catch (InterruptedException e) {  
  11.                     e.printStackTrace();  
  12.                 }  
  13.                 return true;  
  14.             };  
  15.         };  
  16.         System.out.println(getNow()+"--開始插入資料");  
  17.         for(int i=0;i<10;i++){  
  18.             cs.submit(task);              
  19.         }  
  20.         for(int i=0;i<10;i++){  
  21.             try {  
  22.                 //ExecutorCompletionService.take()方法是阻塞的,如果當前沒有完成的任務則阻塞  
  23.                 System.out.println(cs.take().get());  
  24.                 //實際使用時,take()方法獲取的結果可用於處理,如果插入失敗,則可以進行重試或記錄等操作  
  25.             } catch (InterruptedException|ExecutionException e) {  
  26.                 e.printStackTrace();  
  27.             }  
  28.         }  
  29.         System.out.println(getNow()+"--插入資料完成");  
  30.         pool.shutdown();  
  31.     }  
  32.     public static String getNow(){  
  33.         SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  34.         return sdf.format(new Date());  
  35.     }  
  36. }  


5.CompletionService與Callable<V>+Future的對比:
在上面的Callable中說過,Callable+Future能實現任務的分治,但是有個問題就是:不知道call()什麼時候完成,需要人為控制等待。
而jdk通過CompetionService已經將此麻煩簡化,通過CompletionService將非同步任務完成的與未完成的區分開來(正如api的描述),我們只用去取即可。
CompletionService有什麼好處呢?
如上所說:1)將已完成的任務和未完成的任務分開了,無需開發者操心;2)隱藏了Future類,簡化了程式碼的使用。真想點個贊!

6.CountDownLatch

1.CountDownLatch:api解釋:一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。個人理解是CountDownLatch讓可以讓一組執行緒同時執行,然後在這組執行緒全部執行完前,可以讓另一個執行緒等待。
就好像跑步比賽,10個選手依次就位,哨聲響才同時出發;所有選手都通過終點,才能公佈成績。那麼CountDownLatch就可以控制10個選手同時出發,和公佈成績的時間。
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,或入口:在通過呼叫 countDown() 的執行緒開啟入口前,所有呼叫 await 的執行緒都一直在入口處等待。用 N 初始化的 CountDownLatch 可以使一個執行緒在 N 個執行緒完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。 
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);


程式碼示例可參考api的示例。(重要)
2.程式碼示例:

個人示例:

  1. public class CountDownLatchTest {  
  2.     private static SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  3.     public static void main(String[] args) {  
  4.         final CountDownLatch start = new CountDownLatch(1); //用一個訊號控制一組執行緒的開始,初始化為1  
  5.         final CountDownLatch end = new CountDownLatch(10);  //要等待N個執行緒的結束,初始化為N,這裡是10  
  6.         Runnable r = new Runnable(){  
  7.             public void run(){  
  8.                 try {  
  9.                     start.await();  //阻塞,這樣start.countDown()到0,所有阻塞在start.await()處的執行緒一起執行  
  10.                     Thread.sleep((long) (Math.random()*10000));  
  11.                     System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--執行完成");  
  12.                     end.countDown();//非阻塞,每個執行緒執行完,讓end--,這樣10個執行緒執行完end倒數到0,主執行緒的end.await()就可以繼續執行  
  13.                 } catch (InterruptedException e) {  
  14.                     e.printStackTrace();  
  15.                 }  
  16.             }  
  17.         };  
  18.         for(int i=0;i<10;i++){  
  19.             new Thread(r).start();  //雖然開始了10個執行緒,但所有執行緒都阻塞在start.await()處  
  20.         }  
  21.         System.out.println(getNow()+"--執行緒全部啟動完畢,休眠3s再讓10個執行緒一起執行");  
  22.         try {  
  23.             Thread.sleep(3*1000);  
  24.         } catch (InterruptedException e) {  
  25.             e.printStackTrace();  
  26.         }  
  27.         System.out.println(getNow()+"--開始");  
  28.         start.countDown();  //start初始值為1,countDown()變成0,觸發10個執行緒一起執行  
  29.         try {  
  30.             end.await();        //阻塞,等10個執行緒都執行完了才繼續往下。  
  31.         } catch (InterruptedException e) {  
  32.             e.printStackTrace();  
  33.         }  
  34.         System.out.println(getNow()+"--10個執行緒都執行完了,主執行緒繼續往下執行!");  
  35.     }  
  36.     private static String getNow(){  
  37.         return sdf.format(new Date());  
  38.     }  
  39. }  

7.CyclicBarrier

1.一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點。也就是說,這一組執行緒的執行分幾個節點,每個節點往下執行,都需等待其他執行緒,這就需要這種等待具有迴圈性。CyclicBarrier在這樣的情況下就很有用。
2.CyclicBarrier與CountDownLacth的區別:
1)CountDownLacth用於一個執行緒與一組執行緒之間的相互等待。常用的就是一個主執行緒與一組分治執行緒之間的等待:主執行緒發號令,一組執行緒同時執行;一組執行緒依次執行完,再喚醒主執行緒繼續執行;
CyclicBarrier用於一組執行緒執行時,每個執行緒執行有多個節點,每個節點的處理需要相互等待。如:對5個檔案進行處理,按行將各個檔案數字挑出來合併成一行,排序,並輸出到另一個檔案,那每次處理都需要等待5個執行緒讀入下一行。(api示例可供參考)
2)CountDownLacth的處理機制是:初始化一個值N(相當於一組執行緒有N個),每個執行緒呼叫一次countDown(),那麼cdLatch減1,等所有執行緒都呼叫過countDown(),那麼cdLatch值達到0,那麼執行緒從await()處接著玩下執行。
CyclicBarrier的處理機制是:初始化一個值N(相當於一組執行緒有N個),每個執行緒呼叫一次await(),那麼barrier加1,等所有執行緒都呼叫過await(),那麼barrier值達到初始值N,所有執行緒接著往下執行,並將barrier值重置為0,再次迴圈下一個屏障;
3)由2)可以知道,CountDownLatch只可以使用一次,而CyclicBarrier是可以迴圈使用的。
3.個人用於理解的示例:

  1. public class CyclicBarrierTest {  
  2.     private static final CyclicBarrier barrier = new CyclicBarrier(5,  
  3.             new Runnable(){  
  4.                 public void run(){  //每次執行緒到達屏障點,此方法都會執行  
  5.                     System.out.println("\n--------barrier action--------\n");  
  6.                 }  
  7.             });  
  8.     public static void main(String[] args) {  
  9.         for(int i=0;i<5;i++){  
  10.             new Thread(new CyclicBarrierTest().new Worker()).start();  
  11.         }  
  12.     }  
  13.     class Worker implements Runnable{  
  14.         public void run(){  
  15.             try {  
  16.                 System.out.println(Thread.currentThread().getName()+"--第一階段");  
  17.                 Thread.sleep(getRl());  
  18.                 barrier.await();    //每一次await()都會阻塞,等5個執行緒都執行到這一步(相當於barrier++操作,加到初始化值5),才繼續往下執行  
  19.                 System.out.println(Thread.currentThread().getName()+"--第二階段");  
  20.                 Thread.sleep(getRl());  
  21.                 barrier.await();    //每一次5個執行緒都到達共同的屏障節點,會執行barrier初始化引數中定義的Runnable.run()  
  22.                 System.out.println(Thread.currentThread().getName()+"--第三階段");  
  23.                 Thread.sleep(getRl());  
  24.                 barrier.await();  
  25.                 System.out.println(Thread.currentThread().getName()+"--第四階段");  
  26.                 Thread.sleep(getRl());  
  27.                 barrier.await();  
  28.                 System.out.println(Thread.currentThread().getName()+"--第五階段");  
  29.                 Thread.sleep(getRl());  
  30.                 barrier.await();  
  31.                 System.out.println(Thread.currentThread().getName()+"--結束");  
  32.             } catch (InterruptedException | BrokenBarrierException e) {  
  33.                 e.printStackTrace();  
  34.             }  
  35.         }  
  36.     }  
  37.     public static long getRl(){  
  38.         return Math.round(10000);  
  39.     }  
  40. }  


4.參考api的示例。
api的示例自己看,就是加深印象。
但是api中有一點描述:如果屏障操作在執行時不依賴於正掛起的執行緒,則執行緒組中的任何執行緒在獲得釋放時都能執行該操作。為方便此操作,每次呼叫 await() 都將返回能到達屏障處的執行緒的索引。然後,您可以選擇哪個執行緒應該執行屏障操作,例如: 

  1. if (barrier.await() == 0) {  
  2. <span style="white-space:pre">    </span> // log the completion of this iteration  
  3. }  

就是說,barrier.await()還會返回一個int值。這個返回值到底是什麼呢?不是返回的執行緒的索引,返回的是:N-進入等待執行緒數,如5個執行緒,5執行緒都進入等待,那返回值就是0(具體可以參看原始碼)。那麼barrier.await()==0也可以看做是一個N執行緒都達到公共屏障的訊號,然後在此條件下處理原本需要放在Runnable引數中的邏輯。不用擔心多執行緒會多次執行此邏輯,N個執行緒只有一個執行緒barrier.await()==0。

8.Exchanger

1.Exchanger可以在對中對元素進行配對和交換的執行緒的同步點。api上不是太好理解,個人理解說白了就是兩個執行緒交換各自使用的指定記憶體資料。
2.場景:
api中有示例,兩個執行緒A、B,各自有一個數據型別相同的變數a、b,A執行緒往a中填資料(生產),B執行緒從b中取資料(消費)。具體如何讓a、b在記憶體發生關聯,就由Exchanger完成。
api中說:Exchanger 可能被視為 SynchronousQueue 的雙向形式。怎麼理解呢?傳統的SynchronousQueue存取需要同步,就是A放入需要等待B取出,B取出需要等待A放入,在時間上要同步進行。而Exchanger在B取出的時候,A是同步在放入的。即:1)A放入a,a滿,然後與B交換記憶體,那A就可以操作b(b空),而B可以操作a;2)等b被A存滿,a被B用完,再交換;3)那A又填充a,B又消費b,依次迴圈。兩個記憶體在一定程度上是同時被操作的,在時間上不需要同步。
再理解就是:如果生產需要5s,消費需要5s。SynchronousQueue一次存取需要10s,而Exchanger只需要5s。4.注意事項:
目前只知道Exchanger只能發生在兩個執行緒之間。但實際上Exchanger的原始碼是有多個插槽(Slot),交換是通過執行緒ID的hash值來定位的。目前還沒搞懂?待後續。
如果一組執行緒aGroup操作a記憶體,一組執行緒bGroup操作b記憶體,如何交換?能不能交換?
3.程式碼示例:

  1. public class ExchangerTest {  
  2.     private SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  3.     private static Exchanger<Queue<Integer>> changer = new Exchanger<Queue<Integer>>();  
  4.     public static void main(String[] args) {  
  5.         new Thread(new ExchangerTest().new ProducerLoop()).start();  
  6.         new Thread(new ExchangerTest().new ConsumerLoop()).start();  
  7.     }  
  8.     class ProducerLoop implements Runnable{  
  9.         private Queue<Integer> pBuffer = new LinkedBlockingQueue<Integer>();  
  10.         private final int maxnum = 10;  
  11.         @Override  
  12.         public void run() {  
  13.             try{  
  14.                 for(;;){  
  15.                     Thread.sleep(500);  
  16.                     pBuffer.offer((int) Math.round(Math.random()*100));  
  17.                     if(pBuffer.size() == maxnum){  
  18.                         System.out.println(getNow()+"--producer交換前");  
  19.                         pBuffer = changer.exchange(pBuffer);  
  20.                         System.out.println(getNow()+"--producer交換後");  
  21.                     }  
  22.                 }  
  23.             }catch(Exception e){  
  24.                 e.printStackTrace();  
  25.             }  
  26.         }  
  27.     }     
  28.     class ConsumerLoop implements Runnable{  
  29.         private Queue<Integer> cBuffer = new LinkedBlockingQueue<Integer>();  
  30.         @Override  
  31.         public void run() {  
  32.             try{  
  33.                 for(;;){  
  34.                     if(cBuffer.size() == 0){  
  35.                         System.out.println("\n"+getNow()+"--consumer交換前");  
  36.                         cBuffer = changer.exchange(cBuffer);  
  37.                         System.out.println(getNow()+"--consumer交換後");  
  38.                     }  
  39.                     System.out.print(cBuffer.poll()+" ");  
  40.                     Thread.sleep(500);  
  41.                 }  
  42.             }catch(Exception e){  
  43.                 e.printStackTrace();  
  44.             }  
  45.         }  
  46.     }     
  47.     private String getNow(){  
  48.         return sdf.format(new Date());  
  49.     }  
  50. }  

4.注意事項:
目前只知道Exchanger只能發生在兩個執行緒之間。但實際上Exchanger的原始碼是有多個插槽(Slot),交換是通過執行緒ID的hash值來定位的。目前還沒搞懂?待後續。
如果一組執行緒aGroup操作a記憶體,一組執行緒bGroup操作b記憶體,如何交換?能不能交換?

9.Phaser

Phaser是jdk1.7的新特性。其功能類似CyclicBarrier和CountDownLatch,但其功能更靈活,更強大,支援動態調整需要控制的執行緒數。不重複了。參考連結: