java併發與多執行緒API學習
Executor介面
public interface Executor {
void execute(Runnable command);
}
Executor介面中之定義了一個方法execute(Runnable command),該方法接收一個Runable例項,它用來執行一個任務,任務即一個實現了Runnable介面的類。 在Java 5之後,任務分兩類:一類是實現了Runnable介面的類,一類是實現了Callable介面的類。兩者都可以被ExecutorService執行,但是Runnable任務沒有返回值,而Callable任務有返回值。並且Callable的call()方法只能通過ExecutorService的submit(Callable<T> task) 方法來執行,並且返回一個 <T>Future<T>,是表示任務等待完成的Future。 Callable介面類似於Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常而Callable又返回結果,而且當獲取返回結果時可能會丟擲異常。Callable中的call()方法類似Runnable的run()方法,區別同樣是有返回值,後者沒有。 當將一個Callable的物件傳遞給ExecutorService的submit方法,則該call方法自動在一個執行緒上執行,並且會返回執行結果Future物件。同樣,將Runnable的物件傳遞給ExecutorService的submit方法,則該run方法自動在一個執行緒上執行,並且會返回執行結果Future物件,但是在該Future物件上呼叫get方法,將返回null。
ExecutorService介面
public interface ExecutorService extends Executor { void shutdown(); List shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; Future submit(Callable task); Future submit(Runnable task, T result); Future<?> submit(Runnable task); List<Future> invokeAll(Collection<? extends Callable> tasks) throws InterruptedException; List<Future> invokeAll(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException; T invokeAny(Collection<? extends Callable> tasks) throws InterruptedException, ExecutionException; T invokeAny(Collection<? extends Callable> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
一共12個方法,其中一部分是和執行器生命週期相關的方法,而另一部分則是以各種方式提交要執行的任務的方法。像submit()就是提交任務的一個方法,在實現中做了適配的工作,無論引數是Runnable還是Callable,執行器都會正確執行。當然,這實際上用到的是前文提過的RunnableFuture的實現類FutureTask。
ExecutorService介面繼承自Executor介面,它提供了更豐富的實現多執行緒的方法,比如,ExecutorService提供了關閉自己的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。可以呼叫ExecutorService的shutdown()方法來平滑地關閉 ExecutorService,呼叫該方法後,將導致ExecutorService停止接受任何新的任務且等待已經提交的任務執行完成(已經提交的任務會分兩類:一類是已經在執行的,另一類是還沒有開始執行的),當所有已經提交的任務執行完畢後將會關閉ExecutorService。因此我們一般用該介面來實現和管理多執行緒。
ExecutorService的生命週期包括三種狀態:執行、關閉、終止。建立後便進入執行狀態,當呼叫了shutdown()方法時,便進入關閉狀態,此時意味著ExecutorService不再接受新的任務,但它還在執行已經提交了的任務,當素有已經提交了的任務執行完後,便到達終止狀態。如果不呼叫shutdown()方法,ExecutorService會一直處在執行狀態,不斷接收新的任務,執行新的任務,伺服器端一般不需要關閉它,保持一直執行即可。
那為什麼要使用ExecutorService呢?
a. 每次new Thread新建物件效能差。
b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。
c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。
相比new Thread,Java提供的四種執行緒池的好處在於:
a. 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
b. 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、定期執行、單執行緒、併發數控制等功能。
ThreadPoolExecutor
自定義執行緒池,可以用ThreadPoolExecutor類建立,它有多個構造方法來建立執行緒池,用該類很容易實現自定義的執行緒池//建立等待佇列
BlockingQueue<Runnable> bqueue = new ArrayBlockingQueue<Runnable>(20);
//建立執行緒池,池中儲存的執行緒數為3,允許的最大執行緒數為5
ThreadPoolExecutor pool = new ThreadPoolExecutor(3,5,50,TimeUnit.MILLISECONDS,bqueue);
AbstractExecutorService
這個類是一個抽象類,規範了所有 工廠(Executors)構造的例項,也就是所有工廠構建的ExecutorService例項必須繼承自AbstractExecutorService。這個類是ExecutorService的一個抽象實現。其中,提交任務的各類方法已經給出了十分完整的實現。之所以抽象,是因為和執行器本身生命週期相關的方法,在此類中並未給出任何實現,需要子類擴充套件完善。
ScheduleExecutorService & ScheduledThreadPoolExecutor
用於執行定時任務
Executors
Executors提供了一系列工廠方法用於創先執行緒池,返回的執行緒池都實現了ExecutorService介面。這些方法返回的ExecutorService物件最終都是由ThreadPoolExecutor實現的,根據不同的需求以不同的引數配置,或經過其它類包裝。
// 建立固定數目執行緒的執行緒池。
public static ExecutorService newFixedThreadPool(int nThreads)
// 建立一個可快取的執行緒池,呼叫execute將重用以前構造的執行緒(如果執行緒可用)。如果現有執行緒沒有可用的,則建立一個新線 程並新增到池中。終止並從快取中移除那些已 有 60 秒鐘未被使用的執行緒。
public static ExecutorService newCachedThreadPool()
// 建立一個單執行緒化的Executor。
public static ExecutorService newSingleThreadExecutor()
// 建立一個支援定時及週期性的任務執行的執行緒池,多數情況下可用來替代Timer類。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
newCachedThreadPool() |
-快取型池子,先檢視池中有沒有以前建立的執行緒,如果有,就 reuse.如果沒有,就建一個新的執行緒加入池中 |
newFixedThreadPool(int) |
-newFixedThreadPool與cacheThreadPool差不多,也是能reuse就用,但不能隨時建新的執行緒 |
newScheduledThreadPool(int) |
-排程型執行緒池 |
newSingleThreadExecutor() |
-單例執行緒,任意時間池中只能有一個執行緒 |
一般來說,CachedTheadPool在程式執行過程中通常會建立與所需數量相同的執行緒,然後在它回收舊執行緒時停止建立新執行緒,因此它是合理的Executor的首選,只有當這種方式會引發問題時(比如需要大量長時間面向連線的執行緒時),才需要考慮用FixedThreadPool
CompletionService & ExecutorCompletionService
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
<span style="font-family:Microsoft YaHei;">}</span>
這個介面是為了方便多個任務執行時,可以方便得獲取到執行任務的Future結果。同樣,也是五個方法,分為兩大方面。一個是對Callable和Runnable型別引數的任務提交,另一方面則是嘗試對結果以不同的方式進行獲取,take()方法一般是阻塞式的獲取,後兩者則更靈活。
ExecutorCompletionService這個實現類主要做的就是將執行完成的任務結果放到阻塞佇列中,這樣等待結果的執行緒,如執行take()方法會得到結果並恢復執行。
ExecutorCompletionService有3個屬性:
AbstractExecutorService類的物件aes
Executor類的物件executor
BlockingQueue<Future<V>>的completionQueue
通常,如果executor是AbstractExecutorService的一個實現,則將其賦值給aes屬性,否則賦值為null。
在這個類中,executor負責執行任務,而aes則負責做適配處理,返回包裝好任務的FutureTask物件。
這裡面有一個對於實現功能很重要的內部類QueueingFuture,實現如下:
private class QueueingFuture extends FutureTask {
QueueingFuture(RunnableFuture task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future task;
}
主要是擴充套件了FutureTask的done方法,將執行結果放入BlockingQueue中
Callable
很多場景一個任務完成執行,我們是需要知道它的結果的。為了彌補這個不足, 從JDK1.5開始,在java.util.concurrent包中就有了Callable這個介面。
public interface Callable<V> {
V call() throws Exception;
}
注意到,其中call()方法除了有返回結果以外,比起run()還有異常丟擲,這個使用時是要注意的。
在JavaSE5之後,執行器Executor是JDK提供給我們的良好工具,在ExecutorService中也有了支援Callable的submit()方法,那麼對於其call()的執行結果我們如何來取呢,這就要提到另一個類——java.util.concurrent.Future。
Future
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask
FutureTask是Future具體的實現類
RunnableFuture & RunnableScheduledFuture
public interface RunnableFuture extends Runnable, Future {
void run();
}
RunnableFuture把Runnable和Future兩個介面捏到一起了。實際上,這個介面用意應該是這樣的,將需要run()的任務和結果結合在一起,執行了run()能夠保證結果被設定從而可以獲取到。在ExecutorService中,submit()方法會有很多過載實現,有的用Runnable引數,有的用Callable引數。而對於submit()方法本身的實際操作,就是:執行任務和返回Future物件。
在實現中,AbstractExecutorService的submit()方法無論傳入的是Callable還是Runnable,都會呼叫newTaskFor()將其轉變為一個RunnableFuture物件,這個物件既會被用來呼叫Executor的execute()方法,也會作為Future結果返回。
JDK1.6之後,FutureTask也就成為了RunnableFuture的一個實現,當然也還是Future的實現類。我們再來簡單看下它的實現。
前面文章提到過AbstractQueuedSynchronizer類(AQS)的應用,實際上FutureTask中也有一個名為Sync而且繼承於AQS的內部類。在FutureTask的實現中,每個任務都有執行狀態,其巧妙地運用了AQS提供的state屬性和protected方法,保證了對Future結果獲取執行緒的阻塞和喚醒。
Atomic類
Atomic類是通過硬體支援的CAS操作實現的,其基本思想是如果這個地址上的值和期望的值相等,則給其賦予新值,否則不做任何事兒,但是要返回原值是多少。在java 7中,共有以下幾種Atomic類:標量類:AtomicBoolean,AtomicInteger,AtomicLong,AtomicReference
陣列類:AtomicIntegerArray,AtomicLongArray,AtomicReferenceArray
更新器類:AtomicLongFieldUpdater,AtomicIntegerFieldUpdater,AtomicReferenceFieldUpdater
複合變數類:AtomicMarkableReference,AtomicStampedReference
當執行緒數較多,競爭較為激烈的情況下,CAS效率也會受到影響。因此,在java 8中引入了LongAdder, DoubleAdder等類,他們的思路是通過分段,不同執行緒在不同段上做累加,最後進行彙總。
併發容器
ConcurrentMap
用分離鎖實現多個執行緒間的更深層次的共享訪問。
用 HashEntery 物件的不變性來降低執行讀操作的執行緒在遍歷連結串列期間對加鎖的需求。
通過對同一個Volatile 變數的寫/讀訪問,協調不同執行緒間讀/寫操作的記憶體可見性。
ConcurrentNavigableMap & ConcurrentSkipListMap
ConcurrentSkipListMap不同於TreeMap,前者使用SkipList(跳錶)實現排序,而後者使用紅黑樹。相比紅黑樹,跳錶的原理比較容易理解,簡單點說就是在有序的連結串列上使用多級索引來定位元素。
ConcurrentSkiplistMap VS ConcurrentHashMap
在4執行緒1.6萬資料的條件下,ConcurrentHashMap 存取速度是ConcurrentSkipListMap 的4倍左右。但ConcurrentSkipListMap有幾個ConcurrentHashMap 不能比擬的優點:1、ConcurrentSkipListMap 的key是有序的。
2、ConcurrentSkipListMap 支援更高的併發。ConcurrentSkipListMap 的存取時間是log(N),和執行緒數幾乎無關。也就是說在資料量一定的情況下,併發的執行緒越多,ConcurrentSkipListMap越能體現出他的優勢。
如果需要排序map,在非多執行緒的情況下,應當儘量使用TreeMap。此外對於併發性相對較低的並行程式可以使用Collections.synchronizedSortedMap將TreeMap進行包裝,也可以提供較好的效率。對於高併發程式,應當使用ConcurrentSkipListMap,能夠提供更高的併發度。
ConcurrentSkipListSet
CopyOnWriteArrayList & CopyOnWriteArraySet
佇列
阻塞佇列 BlockingQueue
ArrayBlockingQueue
基於陣列實現,一個Lock控制互斥訪問,兩個condition
DelayQueue
延時佇列
LinkedBlockingQueue
基於連結串列實現,兩個鎖實現
PriorityBlockingQueue
優先順序佇列
SynchronousQueue
同步佇列
BlockingDeque & LinkedBlockingDeque
非阻塞佇列 ConcurrentLinkedQueue
Lock介面
Lock & ReentrantLock
Java提供了另一種同步程式碼塊的機制,它是基於Lock介面和它的實現類(例如ReentrantLock)來實現的,這種機制更加強大和靈活,對比Synchronized方法或者Synchronized程式碼塊主要的優點表現在:1)Lock介面允許更加複雜的結構,synchronized關鍵字必須要用結構化的方法來獲取或者釋放同步程式碼塊;
2)Lock介面提供了一些額外的功能。例如tryLock()方法。
3)當只有一個寫和多個讀的執行緒時,Lock介面允許讀寫操作的分離
4)Lock介面的效能更高
ReadWriteLock & ReentrantReadWriteLock
ReadWriteLock是所有Lock介面中最重要的介面之一,ReentrentReadWriteLock是它的唯一實現類。
該類有兩個鎖,一個是讀操作另一個是寫操作。它能夠同時包括多個讀操作,但是隻能有一個寫操作。當某個執行緒執行寫操作時,其他任何執行緒都不能執行讀操作。
Condition
Condition 將 Object 監視器方法(wait、notify 和 notifyAll)分解成截然不同的物件,以便通過將這些物件與任意 Lock 實現組合使用,為每個物件提供多個等待 set(wait-set)。其中,Lock 替代了 synchronized 方法和語句的使用,Condition 替代了 Object 監視器方法的使用。Condition 例項實質上被繫結到一個鎖上。要為特定 Lock 例項獲得 Condition 例項,請使用其 newCondition() 方法。等待用await(),釋放用signal().
Synchronized與Lock區別
synchronized是託管給JVM執行的,而lock是java寫的控制鎖的程式碼。Lock提供了公平鎖;
Lock提供了Condition;
Lock提供了取消鎖等待;
synchronized和lock效能區別
在Java1.5中,synchronize是效能低效的。因為這是一個重量級操作,需要呼叫操作介面,導致有可能加鎖消耗的系統時間比加鎖以外的操作還多。相比之下使用Java提供的Lock物件,效能更高一些。但是到了Java1.6,發生了變化。synchronize在語義上很清晰,可以進行很多優化,有適應自旋,鎖消除,鎖粗化,輕量級鎖,偏向鎖等等。導致在Java1.6上synchronize的效能並不比Lock差。官方也表示,他們也更支援synchronize,在未來的版本中還有優化餘地。說到這裡,還是想提一下這2中機制的具體區別。據我所知,synchronized原始採用的是CPU悲觀鎖機制,即執行緒獲得的是獨佔鎖。獨佔鎖意味著其他執行緒只能依靠阻塞來等待執行緒釋放鎖。而在CPU轉換執行緒阻塞時會引起執行緒上下文切換,當有很多執行緒競爭鎖的時候,會引起CPU頻繁的上下文切換導致效率很低。
而Lock用的是樂觀鎖方式。所謂樂觀鎖就是,每次不加鎖而是假設沒有衝突而去完成某項操作,如果因為衝突失敗就重試,直到成功為止。樂觀鎖實現的機制就是CAS操作(Compare and Swap)。我們可以進一步研究ReentrantLock的原始碼,會發現其中比較重要的獲得鎖的一個方法是compareAndSetState。這裡其實就是呼叫的CPU提供的特殊指令。
現代的CPU提供了指令,可以自動更新共享資料,而且能夠檢測到其他執行緒的干擾,而 compareAndSet() 就用這些代替了鎖定。這個演算法稱作非阻塞演算法,意思是一個執行緒的失敗或者掛起不應該影響其他執行緒的失敗或掛起的演算法。
同步器
訊號量Semaphore
Semaphore控制同時訪問資源的執行緒個數,如三臺印表機,則只允許三個執行緒從job佇列中取出job並執行
閉鎖CountDownLatch
例如,會議室人到齊後開始會議。Conference持有一個CountDownLatch物件,設定了初始值,每個Participant持有Conference物件,當執行Participant的run方法時,呼叫Conference的arrive方法,從而將Conference的CountDownLatch執行countDown操作。在Conference的run方法中,執行CountDownLatch的await方法。
關卡CyclicBarrier
表示請大家等待,等所有集合都準備好了,那麼就開始執行,這個過程可以迴圈。比如:公司部門的週末準備一起出去遊玩,先等到所有的人到達汽車才開始啟動車輛到目的地去,到後自由玩,然後到1點在一起吃飯。
將CyclicBarrier物件傳給所有的Mapper,等所有的MapperTask執行完,將執行結果放入結果集後,呼叫barrier.await()方法。這時,執行緒會等待所有的MapperTask執行完。之後會自動呼叫傳入CyclicBarrier的一個Runnable物件,並執行其run方法。
交換器Exchanger
用於兩個執行緒在執行時資料交換
計時器Timer
主要用於執行定時任務。Timer就是一個執行緒,使用schedule方法完成對TimerTask的排程,多個TimerTask可以共用一個Timer,也就是說Timer物件呼叫一次schedule方法就是建立了一個執行緒,並且呼叫一次schedule後TimerTask是無限制的迴圈下去的,使用Timer的cancel()停止操作。當然同一個Timer執行一次cancel()方法後,所有Timer執行緒都被終止。//以下是幾種排程task的方法:
timer.schedule(task, time);
// time為Date型別:在指定時間執行一次。
timer.schedule(task, firstTime, period);
// firstTime為Date型別,period為long
// 從firstTime時刻開始,每隔period毫秒執行一次。
timer.schedule(task, delay)
// delay 為long型別:從現在起過delay毫秒執行一次
timer.schedule(task, delay, period)
// delay為long,period為long:從現在起過delay毫秒以後,每隔period
// 毫秒執行一次。 在java正則表示式中,當遇到某些特殊的正則表示式和需要匹配的字串時,會發生regex hang issue,例如如下程式將會導致CPU執行時間較長,如果content中的A的個數更多時,執行時間是指數級增長的。
Pattern pattern = Pattern.compile("(A*)*A");
String content = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB";
long current = System.currentTimeMillis();
Matcher matcher = pattern.matcher(content);
if(matcher.matches()) {
System.out.println("Matched");
} else {
System.out.println("Not Matched");
}
System.out.println(System.currentTimeMillis()-current);
對於上述問題,我們可以將content轉換成一個字元序列CharSequence,在正則表示式匹配的時候,會隱式呼叫charAt()方法。因此,可以重寫/覆蓋charAt()方法,即每次呼叫的時候,加上一個標識判斷,如果標識為false,則退出。另一方面,可以定義一個Timer,當達到了指定delay時間時,改變標識為false,就可以順利退出match。具體實現如下:
public class MySequence implements CharSequence {
private final CharSequence content;
private final AtomicBoolean signal;
public MySequence(CharSequence content, AtomicBoolean signal) {
this.content = content;
this.signal = signal;
}
@Override
public int length() {
return content.length();
}
@Override
public char charAt(int index) {
if(signal.get())
throw new RuntimeException(new MyException("regex hang"));
return content.charAt(index);
}
@Override
public CharSequence subSequence(int start, int end) {
return new TimeoutRegexCharSequence(content.subSequence(start, end), signal);
}
@Override
public String toString() {
return content.toString();
}
}
public class MyTimerTask extends TimerTask {
private volatile AtomicBoolean signal;
public MyTimerTask(AtomicBoolean signal) {
this.signal = signal;
}
@Override
public void run() {
signal.set(true);
}
}
Pattern pattern = Pattern.compile("(A*)*A");
String content = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAB";
Timer timer = new Timer("name", true);
final AtomicBoolean signal = new AtomicBoolean(false);
MyTimerTask timerTask = new MyTimerTask(signal);
timer.schedule(timerTask, 10*1000);
MySequence timeoutInput = new MySequence(content, signal);
Matcher matcher;
try {
matcher = pattern.matcher(timeoutInput);
boolean isMatched = matcher.matches();
} catch(RuntimeException ex) {
Throwable th = ex.getCause();
if(th instanceof MyException) {
System.out.println("regex hang");
}
throw ex;
} finally {
timerTask.cancel();
signal.set(false);
}