Java併發庫(九、十):執行緒池、Callable、Future
深切懷念傳智播客張孝祥老師,特將其代表作——Java併發庫視訊研讀兩遍,受益頗豐,記以後閱
http://blog.csdn.net/mmc_maodun/article/category/1773509
09.java5執行緒併發庫的應用
如果沒有執行緒池,需要在run方法中不停判斷,還有沒有任務需要執行
執行緒池的通俗比喻:接待客戶,為每個客戶都安排一個工作人員,接待完成後該工作人員就廢掉。伺服器每收到一個客戶請求就為其分配一個執行緒提供服務,服務結束後銷燬執行緒,不斷建立、銷燬執行緒,影響效能。
執行緒池:先建立多個執行緒放線上程池中,當有任務需要執行時,從執行緒池中找一個空閒執行緒執行任務,任務完成後,並不銷燬執行緒,而是返回執行緒池,等待新的任務安排。
執行緒池程式設計中,任務是提交給整個執行緒池的,並不是提交給某個具體的執行緒,而是由執行緒池從中挑選一個空閒執行緒來執行任務。一個執行緒同時只能執行一個任務,可以同時向一個執行緒池提交多個任務。
執行緒池建立方法:
a、建立一個擁有固定執行緒數的執行緒池
ExecutorServicethreadPool = Executors.newFixedThreadPool(3);
b、建立一個快取執行緒池 執行緒池中的執行緒數根據任務多少自動增刪 動態變化
ExecutorServicethreadPool = Executors.newCacheThreadPool();
c、建立一個只有一個執行緒的執行緒池 與單執行緒一樣 但好處是保證池子裡有一個執行緒,
當執行緒意外死亡,會自動產生一個替補執行緒,始終有一個執行緒存活
ExecutorServicethreadPool = Executors.newSingleThreadExector();
往執行緒池中新增任務
threadPool.executor(Runnable)
關閉執行緒池:
threadPool.shutdown() 執行緒全部空閒,沒有任務就關閉執行緒池
threadPool.shutdownNow() 不管任務有沒有做完,都關掉
</pre><pre name="code" class="java"><span style="white-space:pre"> </span>//ExecutorService threadPool = Executors.newFixedThreadPool(3);
//ExecutorService threadPool = Executors.newCachedThreadPool();
ExecutorService threadPool = Executors.newSingleThreadExecutor();
for(int i=1;i<=10;i++){
final int task = i;
threadPool.execute(new Runnable(){
@Override
public void run() {
for(int j=1;j<=10;j++){
try {
Thread.sleep(20);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName() + " is looping of " + j + " for task of " + task);
}
}
});
}
System.out.println("all of 10 tasks have committed! ");
//threadPool.shutdownNow();//不管完成沒有
<span style="white-space:pre"> </span>threadPool.shutdown();//完成之後
用執行緒池啟動定時器:
a、建立排程執行緒池,提交任務 延遲指定時間後執行任務
Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,時間單位);
b、建立排程執行緒池,提交任務, 延遲指定時間執行任務後,間隔指定時間迴圈執行
Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,
間隔時間,時間單位);
所有的 schedule 方法都接受相對延遲和週期作為引數,而不是絕對的時間或日期。將以
Date
所表示的絕對時間轉換成要求的形式很容易。例如,要安排在某個以後的Date 執行,可以使用:schedule(task,date.getTime() - System.currentTimeMillis(),TimeUnit.MILLISECONDS)。
Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
newRunnable(){
@Override
publicvoid run() {
System.out.println("bombing!");
}},
6,
2,
TimeUnit.SECONDS);
}
10.Callable與Future的應用:獲取一個執行緒的執行結果
public interface Callable<V>
返回結果並且可能丟擲異常的任務。實現者定義了一個不帶任何引數的叫做call 的方法。 Callable 介面類似於 Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是Runnable 不會返回結果,並且無法丟擲經過檢查的異常。
只有一個方法Vcall
()
計算結果,如果無法計算結果,則丟擲一個Exception異常。
使用方法:
ExecutorServicethreadPool = Executors.newSingleThreadExccutor();
如果不需要返回結果,就用executor方法 呼叫submit方法返回一個Future物件
Future<T> future = threadPool.submit(new Callable<T>(){//接收一個Callable介面的例項物件
覆蓋Callable介面中的call方法,丟擲異常
publicTcall() throws Exception
{
ruturnT
}
});
獲取Future接收的結果
future。get();會丟擲異常
future.get()沒有拿到結果就會一直等待
Future取得的結果型別和Callable返回的結果型別必須一致,通過泛型實現。Callable要通過ExecutorService的submit方法提交,返回的Future物件可以取消任務。
public interface Future<V>
Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用Future 但又不提供可用的結果,則可以宣告Future<?> 形式型別、並返回 null 作為底層任務的結果。
方法摘要 |
|
boolean |
cancel(boolean mayInterruptIfRunning) 試圖取消對此任務的執行。 |
get() 如有必要,等待計算完成,然後獲取其結果。 |
|
get(long timeout,TimeUnit unit) 如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。 |
|
boolean |
isCancelled() 如果在任務正常完成前將其取消,則返回 true。 |
boolean |
isDone() 如果任務已完成,則返回 true。 |
public interface CompletionService<V>
CompletionService用於提交一組Callable任務,其take方法返回一個已完成的Callable任務對應的Future物件。好比同時種幾塊麥子等待收割,收割時哪塊先熟先收哪塊。
將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者submit 執行的任務。使用者 take 已完成的任務,並按照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理非同步 IO ,執行讀操作的任務作為程式或系統的一部分提交,然後,當完成讀操作時,會在程式的不同部分執行其他操作,執行操作的順序可能與所請求的順序不同。
通常,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種情況下,CompletionService 只管理一個內部完成佇列。ExecutorCompletionService 類提供了此方法的一個實現。
CompletionService方法摘要 |
|
poll() 獲取並移除表示下一個已完成任務的 Future,如果不存在這樣的任務,則返回 null。 |
|
poll(long timeout,TimeUnit unit) 獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則將等待指定的時間(如果有必要)。 |
|
submit(Runnable task,V result) 提交要執行的 Runnable 任務,並返回一個表示任務完成的 Future,可以提取或輪詢此任務。 |
|
take() 獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。 |
|
ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue) |
示例:
ExecutorService threadPool = Executors.newFixedThreadPool(10); //建立執行緒池,傳遞給coms
用threadPool執行任務,執行的任務返回結果都是整數
CompletionService<Integer> coms = newExecutorCompletionService<Integer>(threadPool);
提交10個任務 種麥子
for (int i=0; i<10; i++)
{
finalint num = i+1;
coms.submit(newCallable<Integer>(){
public Integercall() 覆蓋call方法
{匿名內部類使用外部變數要用final修飾
SOP(任務+num);
Thread.sleep(newRandom().nextInt(6)*1000);
return num;
}
});
}
等待收穫 割麥子
for (int i=0; i<10; i++)
{ take獲取第一個Future物件,用get獲取結果
SOP(coms.take().get());
}
public class CallableAndFuture {
/**
* @param args
*/
publicstatic void main(String[] args) {
ExecutorServicethreadPool = Executors.newSingleThreadExecutor();
Future<String>future =
threadPool.submit(
newCallable<String>() {
publicString call() throws Exception {
Thread.sleep(2000);
return"hello";
};
}
);
System.out.println("等待結果");
try{
System.out.println("拿到結果:" + future.get());
}catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}catch (Exception e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
ExecutorServicethreadPool2 = Executors.newFixedThreadPool(10);
CompletionService<Integer>completionService = new ExecutorCompletionService<Integer>(threadPool2);
for(inti=1;i<=10;i++){
finalint seq = i;
completionService.submit(newCallable<Integer>() {
@Override
publicInteger call() throws Exception {
Thread.sleep(newRandom().nextInt(5000));
returnseq;
}
});
}
for(inti=0;i<10;i++){
try{
System.out.println(
completionService.take().get());
}catch (InterruptedException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}catch (ExecutionException e) {
//TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}
public class CallableAndFuture {
/**
* @param args
*/
public static void main(String[] args) {
ExecutorService threadPool = Executors.newSingleThreadExecutor();
Future<String> future =
threadPool.submit(
new Callable<String>() {
public String call() throws Exception {
Thread.sleep(2000);
return "hello";
};
}
);
System.out.println("等待結果");
try {
System.out.println("拿到結果:" + future.get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
for(int i=1;i<=10;i++){
final int seq = i;
completionService.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
Thread.sleep(new Random().nextInt(5000));
return seq;
}
});
}
for(int i=0;i<10;i++){
try {
System.out.println(
completionService.take().get());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
}