java.util.concurrent包下的類 (轉)
文章整理來源:http://www.cnblogs.com/aurawing/articles/1887056.html
一 ,背景:
在JDK1.5之前,Java中要進行業務併發時,通常需要有程式設計師獨立完成程式碼實現,當然也有一些開源的框架提供了這些功能,但是這些依然沒有JDK自帶的功能使用起來方便。
而當針對高質量Java多執行緒併發程式設計時,為防止死鎖等現象的出現,比如使用java之前的wait()、notify()和synchronized等,每每需要考慮效能、死鎖、公平性、資源管理以及如何避免執行緒安全性方面帶來的危害等諸多因素,往往會採用一些較為複雜的安全策略,加重了程式設計師的開發負擔.
萬幸的是,在JDK1.5出現之後,Sun大神(Doug Lea)終於為我們這些可憐的小程式設計師推出了java.util.concurrent工具包以簡化併發完成。開發者們藉助於此,將有效的減少競爭條件(race conditions)和死鎖執行緒。concurrent包很好的解決了這些問題,為我們提供了更實用的併發程式模型。
二簡介
以下是concurrent包下的主要類:
Executor :具體Runnable任務的執行者。
ExecutorService :一個執行緒池管理者,其實現類有多種。我們能把Runnable,Callable提交到池中讓其排程。
Semaphore :一個計數訊號量
ReentrantLock :一個可重入的互斥鎖定 Lock,功能類似synchronized,但要強大的多。
Future :是與Runnable,Callable進行互動的介面,比如一個執行緒執行結束後取返回的結果等,還提供了cancel終止執行緒。
BlockingQueue :阻塞佇列。
CompletionService : ExecutorService的擴充套件,可以獲得執行緒執行結果的
CountDownLatch :一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
CyclicBarrier :一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點
Future :Future 表示非同步計算的結果。
ScheduledExecutorService :一個 ExecutorService,可安排在給定的延遲後執行或定期執行的命令。
三 詳細介紹
接下來逐一介紹
3.1 Executors主要方法說明
newFixedThreadPool(固定大小執行緒池)
建立一個可重用固定執行緒集合的執行緒池,以共享的無界佇列方式來執行這些執行緒(只有要請求的過來,就會在一個佇列裡等待執行)。如果在關閉前的執行期間由於失敗而導致任何執行緒終止,那麼一個新執行緒將代替它執行後續的任務(如果需要)。
newCachedThreadPool(無界執行緒池,可以進行自動執行緒回收)
建立一個可根據需要建立新執行緒的執行緒池,但是在以前構造的執行緒可用時將重用它們。對於執行很多短期非同步任務的程式而言,這些執行緒池通常可提高程式效能。呼叫 execute 將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新執行緒並新增到池中。終止並從快取中移除那些已有 60 秒鐘未被使用的執行緒。因此,長時間保持空閒的執行緒池不會使用任何資源。注意,可以使用 ThreadPoolExecutor 構造方法建立具有類似屬性但細節不同(例如超時引數)的執行緒池。
newSingleThreadExecutor(單個後臺執行緒)
建立一個使用單個 worker 執行緒的 Executor,以無界佇列方式來執行該執行緒。(注意,如果因為在關閉前的執行期間出現失敗而終止了此單個執行緒,那麼如果需要,一個新執行緒將代替它執行後續的任務)。可保證順序地執行各個任務,並且在任意給定的時間不會有多個執行緒是活動的。與其他等效的 newFixedThreadPool(1) 不同,可保證無需重新配置此方法所返回的執行程式即可使用其他的執行緒。
這些方法返回的都是ExecutorService物件,這個物件可以理解為就是一個執行緒池。
這個執行緒池的功能還是比較完善的。可以提交任務submit()可以結束執行緒池shutdown()。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyExecutor extends Thread {
private int index;
public MyExecutor(int i){
this.index=i;
}
public void run(){
try{
System.out.println("["+this.index+"] start....");
Thread.sleep((int)(Math.random()*1000));
System.out.println("["+this.index+"] end.");
}
catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ExecutorService service=Executors.newFixedThreadPool(4);
for(int i=0;i<10;i++){
service.execute(new MyExecutor(i));
//service.submit(new MyExecutor(i));
}
System.out.println("submit finish");
service.shutdown();
}
}
雖然列印了一些資訊,但是看的不是非常清晰,這個執行緒池是如何工作的,我們來將休眠的時間調長10倍。
Thread.sleep((int)(Math.random()*10000));
再來看,會清楚看到只能執行4個執行緒。當執行完一個執行緒後,才會又執行一個新的執行緒,也就是說,我們將所有的執行緒提交後,執行緒池會等待執行完最後shutdown。我們也會發現,提交的執行緒被放到一個“無界佇列裡”。這是一個有序佇列(BlockingQueue,這個下面會說到)。
另外它使用了Executors的靜態函式生成一個固定的執行緒池,顧名思義,執行緒池的執行緒是不會釋放的,即使它是Idle。
這就會產生效能問題,比如如果執行緒池的大小為200,當全部使用完畢後,所有的執行緒會繼續留在池中,相應的記憶體和執行緒切換(while(true)+sleep迴圈)都會增加。
如果要避免這個問題,就必須直接使用ThreadPoolExecutor()來構造。可以像通用的執行緒池一樣設定“最大執行緒數”、“最小執行緒數”和“空閒執行緒keepAlive的時間”。
這個就是執行緒池基本用法。
3.2 Semaphore
一個計數訊號量。從概念上講,訊號量維護了一個許可集合。如有必要,在許可可用前會阻塞每一個 acquire(),然後再獲取該許可。每個 release() 新增一個許可,從而可能釋放一個正在阻塞的獲取者。但是,不使用實際的許可物件,Semaphore 只對可用許可的號碼進行計數,並採取相應的行動。
Semaphore 通常用於限制可以訪問某些資源(物理或邏輯的)的執行緒數目。例如,下面的類使用訊號量控制對內容池的訪問:
這裡是一個實際的情況,大家排隊上廁所,廁所只有兩個位置,來了10個人需要排隊。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
public class MySemaphore extends Thread {
Semaphore position;
private int id;
public MySemaphore(int i,Semaphore s){
this.id=i;
this.position=s;
}
public void run(){
try{
if(position.availablePermits()>0){
System.out.println("顧客["+this.id+"]進入廁所,有空位");
}
else{
System.out.println("顧客["+this.id+"]進入廁所,沒空位,排隊");
}
position.acquire();
System.out.println("顧客["+this.id+"]獲得坑位");
Thread.sleep((int)(Math.random()*1000));
System.out.println("顧客["+this.id+"]使用完畢");
position.release();
}
catch(Exception e){
e.printStackTrace();
}
}
public static void main(String args[]){
ExecutorService list=Executors.newCachedThreadPool();
Semaphore position=new Semaphore(2);
for(int i=0;i<10;i++){
list.submit(new MySemaphore(i+1,position));
}
list.shutdown();
position.acquireUninterruptibly(2);
System.out.println("使用完畢,需要清掃了");
position.release(2);
}
}
3.3 ReentrantLock
一個可重入的互斥鎖定 Lock,它具有與使用 synchronized 方法和語句所訪問的隱式監視器鎖定相同的一些基本行為和語義,但功能更強大。
ReentrantLock 將由最近成功獲得鎖定,並且還沒有釋放該鎖定的執行緒所擁有。當鎖定沒有被另一個執行緒所擁有時,呼叫 lock 的執行緒將成功獲取該鎖定並返回。如果當前執行緒已經擁有該鎖定,此方法將立即返回。可以使用 isHeldByCurrentThread() 和 getHoldCount() 方法來檢查此情況是否發生。
此類的構造方法接受一個可選的公平引數。
當設定為 true時,在多個執行緒的爭用下,這些鎖定傾向於將訪問權授予等待時間最長的執行緒。否則此鎖定將無法保證任何特定訪問順序。
與採用預設設定(使用不公平鎖定)相比,使用公平鎖定的程式在許多執行緒訪問時表現為很低的總體吞吐量(即速度很慢,常常極其慢),但是在獲得鎖定和保證鎖定分配的均衡性時差異較小。不過要注意的是,公平鎖定不能保證執行緒排程的公平性。因此,使用公平鎖定的眾多執行緒中的一員可能獲得多倍的成功機會,這種情況發生在其他活動執行緒沒有被處理並且目前並未持有鎖定時。還要注意的是,未定時的 tryLock 方法並沒有使用公平設定。因為即使其他執行緒正在等待,只要該鎖定是可用的,此方法就可以獲得成功。
建議總是 立即實踐,使用 try 塊來呼叫 lock,在之前/之後的構造中,最典型的程式碼如下:
class X {
private final ReentrantLock lock = new ReentrantLock();
// ...
public void m() {
lock.lock(); // block until condition holds
try {
// ... method body
} finally {
lock.unlock()
}
}
}
例:
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock;
public class MyReentrantLock extends Thread{
TestReentrantLock lock;
private int id;
public MyReentrantLock(int i,TestReentrantLock test){
this.id=i;
this.lock=test;
}
public void run(){
lock.print(id);
}
public static void main(String args[]){
ExecutorService service=Executors.newCachedThreadPool();
TestReentrantLock lock=new TestReentrantLock();
for(int i=0;i<10;i++){
service.submit(new MyReentrantLock(i,lock));
}
service.shutdown();
}
}
class TestReentrantLock{
private ReentrantLock lock=new ReentrantLock();
public void print(int str){
try{
lock.lock();
System.out.println(str+"獲得");
Thread.sleep((int)(Math.random()*1000));
}
catch(Exception e){
e.printStackTrace();
}
finally{
System.out.println(str+"釋放");
lock.unlock();
}
}
}
3.4 BlockingQueue
支援兩個附加操作的 Queue,這兩個操作是:檢索元素時等待佇列變為非空,以及儲存元素時等待空間變得可用。
BlockingQueue 不接受 null 元素。試圖 add、put 或 offer 一個 null 元素時,某些實現會丟擲 NullPointerException。null 被用作指示 poll 操作失敗的警戒值。
BlockingQueue 可以是限定容量的。它在任意給定時間都可以有一個 remainingCapacity,超出此容量,便無法無阻塞地 put 額外的元素。
沒有任何內部容量約束的 BlockingQueue 總是報告 Integer.MAX_VALUE 的剩餘容量。
BlockingQueue 實現主要用於生產者-使用者佇列,但它另外還支援 Collection 介面。因此,舉例來說,使用 remove(x) 從佇列中移除任意一個元素是有可能的。
然而,這種操作通常不 會有效執行,只能有計劃地偶爾使用,比如在取消排隊資訊時。
BlockingQueue 實現是執行緒安全的。所有排隊方法都可以使用內部鎖定或其他形式的併發控制來自動達到它們的目的。
然而,大量的 Collection 操作(addAll、containsAll、retainAll 和 removeAll)沒有 必要自動執行,除非在實現中特別說明。
因此,舉例來說,在只添加了 c 中的一些元素後,addAll(c) 有可能失敗(丟擲一個異常)。
BlockingQueue 實質上不 支援使用任何一種“close”或“shutdown”操作來指示不再新增任何項。
這種功能的需求和使用有依賴於實現的傾向。例如,一種常用的策略是:對於生產者,插入特殊的 end-of-stream 或 poison 物件,並根據使用者獲取這些物件的時間來對它們進行解釋。
下面的例子演示了這個阻塞佇列的基本功能。
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
public class MyBlockingQueue extends Thread {
public static BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
private int index;
public MyBlockingQueue(int i) {
this.index = i;
}
public void run() {
try {
queue.put(String.valueOf(this.index));
System.out.println("{" + this.index + "} in queue!");
} catch (Exception e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
service.submit(new MyBlockingQueue(i));
}
Thread thread = new Thread() {
public void run() {
try {
while (true) {
Thread.sleep((int) (Math.random() * 1000));
if(MyBlockingQueue.queue.isEmpty())
break;
String str = MyBlockingQueue.queue.take();
System.out.println(str + " has take!");
}
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.submit(thread);
service.shutdown();
}
}
---------------------執行結果-----------------
{0} in queue!
{1} in queue!
{2} in queue!
{3} in queue!
0 has take!
{4} in queue!
1 has take!
{6} in queue!
2 has take!
{7} in queue!
3 has take!
{8} in queue!
4 has take!
{5} in queue!
6 has take!
{9} in queue!
7 has take!
8 has take!
5 has take!
9 has take!
-----------------------------------------
3.5 CompletionService
將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,
並按照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理非同步 IO ,執行讀操作的任務作為程式或系統的一部分提交,
然後,當完成讀操作時,會在程式的不同部分執行其他操作,執行操作的順序可能與所請求的順序不同。
通常,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種情況下,
CompletionService 只管理一個內部完成佇列。ExecutorCompletionService 類提供了此方法的一個實現。
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class MyCompletionService implements Callable<String> {
private int id;
public MyCompletionService(int i){
this.id=i;
}
public static void main(String[] args) throws Exception{
ExecutorService service=Executors.newCachedThreadPool();
CompletionService<String> completion=new ExecutorCompletionService<String>(service);
for(int i=0;i<10;i++){
completion.submit(new MyCompletionService(i));
}
for(int i=0;i<10;i++){
System.out.println(completion.take().get());
}
service.shutdown();
}
public String call() throws Exception {
Integer time=(int)(Math.random()*1000);
try{
System.out.println(this.id+" start");
Thread.sleep(time);
System.out.println(this.id+" end");
}
catch(Exception e){
e.printStackTrace();
}
return this.id+":"+time;
}
}
3.6 CountDownLatch
一個同步輔助類,在完成一組正在其他執行緒中執行的操作之前,它允許一個或多個執行緒一直等待。
用給定的計數 初始化 CountDownLatch。由於呼叫了 countDown() 方法,所以在當前計數到達零之前,await 方法會一直受阻塞。
之後,會釋放所有等待的執行緒,await 的所有後續呼叫都將立即返回。這種現象只出現一次——計數無法被重置。如果需要重置計數,請考慮使用 CyclicBarrier。
CountDownLatch 是一個通用同步工具,它有很多用途。將計數 1 初始化的 CountDownLatch 用作一個簡單的開/關鎖存器,
或入口:在通過呼叫 countDown() 的執行緒開啟入口前,所有呼叫 await 的執行緒都一直在入口處等待。
用 N 初始化的 CountDownLatch 可以使一個執行緒在 N 個執行緒完成某項操作之前一直等待,或者使其在某項操作完成 N 次之前一直等待。
CountDownLatch 的一個有用特性是,它不要求呼叫 countDown 方法的執行緒等到計數到達零時才繼續,
而在所有執行緒都能通過之前,它只是阻止任何執行緒繼續通過一個 await。
以下的例子是別人寫的,非常形象。
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCountDownLatch {
public static void main(String[] args) throws InterruptedException {
// 開始的倒數鎖
final CountDownLatch begin = new CountDownLatch(1);
// 結束的倒數鎖
final CountDownLatch end = new CountDownLatch(10);
// 十名選手
final ExecutorService exec = Executors.newFixedThreadPool(10);
for (int index = 0; index < 10; index++) {
final int NO = index + 1;
Runnable run = new Runnable() {
public void run() {
try {
begin.await();//一直阻塞
Thread.sleep((long) (Math.random() * 10000));
System.out.println("No." + NO + " arrived");
} catch (InterruptedException e) {
} finally {
end.countDown();
}
}
};
exec.submit(run);
}
System.out.println("Game Start");
begin.countDown();
end.await();
System.out.println("Game Over");
exec.shutdown();
}
}
CountDownLatch最重要的方法是countDown()和await(),前者主要是倒數一次,後者是等待倒數到0,如果沒有到達0,就只有阻塞等待了
3.7 CyclicBarrier
一個同步輔助類,它允許一組執行緒互相等待,直到到達某個公共屏障點 (common barrier point)。
在涉及一組固定大小的執行緒的程式中,這些執行緒必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待執行緒後可以重用,所以稱它為迴圈 的 barrier。
CyclicBarrier 支援一個可選的 Runnable 命令,在一組執行緒中的最後一個執行緒到達之後(但在釋放所有執行緒之前),
該命令只在每個屏障點執行一次。若在繼續所有參與執行緒之前更新共享狀態,此屏障操作 很有用。
示例用法:下面是一個在並行分解設計中使用 barrier 的例子,很經典的旅行團例子:
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestCyclicBarrier {
// 徒步需要的時間: Shenzhen, Guangzhou, Shaoguan, Changsha, Wuhan
private static int[] timeWalk = { 5, 8, 15, 15, 10 };
// 自駕遊
private static int[] timeSelf = { 1, 3, 4, 4, 5 };
// 旅遊大巴
private static int[] timeBus = { 2, 4, 6, 6, 7 };
static String now() {
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
return sdf.format(new Date()) + ": ";
}
static class Tour implements Runnable {
private int[] times;
private CyclicBarrier barrier;
private String tourName;
public Tour(CyclicBarrier barrier, String tourName, int[] times) {
this.times = times;
this.tourName = tourName;
this.barrier = barrier;
}
public void run() {
try {
Thread.sleep(times[0] * 1000);
System.out.println(now() + tourName + " Reached Shenzhen");
barrier.await();
Thread.sleep(times[1] * 1000);
System.out.println(now() + tourName + " Reached Guangzhou");
barrier.await();
Thread.sleep(times[2] * 1000);
System.out.println(now() + tourName + " Reached Shaoguan");
barrier.await();
Thread.sleep(times[3] * 1000);
System.out.println(now() + tourName + " Reached Changsha");
barrier.await();
Thread.sleep(times[4] * 1000);
System.out.println(now() + tourName + " Reached Wuhan");
barrier.await();
} catch (InterruptedException e) {
} catch (BrokenBarrierException e) {
}
}
}
public static void main(String[] args) {
// 三個旅行團
CyclicBarrier barrier = new CyclicBarrier(3);
ExecutorService exec = Executors.newFixedThreadPool(3);
exec.submit(new Tour(barrier, "WalkTour", timeWalk));
exec.submit(new Tour(barrier, "SelfTour", timeSelf));
//當我們把下面的這段程式碼註釋後,會發現,程式阻塞了,無法繼續執行下去。
exec.submit(new Tour(barrier, "BusTour", timeBus));
exec.shutdown();
}
}
CyclicBarrier最重要的屬性就是參與者個數,另外最要方法是await()。當所有執行緒都呼叫了await()後,就表示這些執行緒都可以繼續執行,否則就會等待。
3.8 Future
Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並檢索計算的結果。
計算完成後只能使用 get 方法來檢索結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。
還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。
如果為了可取消性而使用 Future但又不提供可用的結果,則可以宣告 Future<?> 形式型別、並返回 null 作為基礎任務的結果。
這個我們在前面CompletionService已經看到了,這個Future的功能,而且這個可以在提交執行緒的時候被指定為一個返回物件的。
3.9 ScheduledExecutorService
一個 ExecutorService,可安排在給定的延遲後執行或定期執行的命令。
schedule 方法使用各種延遲建立任務,並返回一個可用於取消或檢查執行的任務物件。scheduleAtFixedRate 和 scheduleWithFixedDelay 方法建立並執行某些在取消前一直定期執行的任務。
用 Executor.execute(java.lang.Runnable) 和 ExecutorService 的 submit 方法所提交的命令,通過所請求的 0 延遲進行安排。
schedule 方法中允許出現 0 和負數延遲(但不是週期),並將這些視為一種立即執行的請求。
所有的 schedule 方法都接受相對 延遲和週期作為引數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。
例如,要安排在某個以後的日期執行,可以使用:schedule(task, date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。
但是要注意,由於網路時間同步協議、時鐘漂移或其他因素的存在,因此相對延遲的期滿日期不必與啟用任務的當前 Date 相符。
Executors 類為此包中所提供的 ScheduledExecutorService 實現提供了便捷的工廠方法。
一下的例子也是網上比較流行的。
import static java.util.concurrent.TimeUnit.SECONDS;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
public class TestScheduledThread {
public static void main(String[] args) {
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(2);
final Runnable beeper = new Runnable() {
int count = 0;
public void run() {
System.out.println(new Date() + " beep " + (++count));
}
};
// 1秒鐘後執行,並每隔2秒執行一次
final ScheduledFuture beeperHandle = scheduler.scheduleAtFixedRate(beeper, 1, 2, SECONDS);
// 2秒鐘後執行,並每次在上次任務執行完後等待5秒後重新執行
final ScheduledFuture beeperHandle2 = scheduler.scheduleWithFixedDelay(beeper, 2, 5, SECONDS);
// 30秒後結束關閉任務,並且關閉Scheduler
scheduler.schedule(new Runnable() {
public void run() {
beeperHandle.cancel(true);
beeperHandle2.cancel(true);
scheduler.shutdown();
}
}, 30, SECONDS);
}
}