1. 程式人生 > >Java併發庫(九、十):執行緒池、Callable、Future

Java併發庫(九、十):執行緒池、Callable、Future

深切懷念傳智播客張孝祥老師,特將其代表作——Java併發庫視訊研讀兩遍,受益頗豐,記以後閱

http://blog.csdn.net/mmc_maodun/article/category/1773509

09.java5執行緒併發庫的應用

       如果沒有執行緒池,需要在run方法中不停判斷,還有沒有任務需要執行

       執行緒池的通俗比喻:接待客戶,為每個客戶都安排一個工作人員,接待完成後該工作人員就廢掉。伺服器每收到一個客戶請求就為其分配一個執行緒提供服務,服務結束後銷燬執行緒,不斷建立、銷燬執行緒,影響效能。

       執行緒池:先建立多個執行緒放線上程池中,當有任務需要執行時,從執行緒池中找一個空閒執行緒執行任務,任務完成後,並不銷燬執行緒,而是返回執行緒池,等待新的任務安排。

       執行緒池程式設計中,任務是提交給整個執行緒池的,並不是提交給某個具體的執行緒,而是由執行緒池從中挑選一個空閒執行緒來執行任務。一個執行緒同時只能執行一個任務,可以同時向一個執行緒池提交多個任務。

執行緒池建立方法:

a、建立一個擁有固定執行緒數的執行緒池

       ExecutorServicethreadPool = Executors.newFixedThreadPool(3);   

       b、建立一個快取執行緒池     執行緒池中的執行緒數根據任務多少自動增刪 動態變化

       ExecutorServicethreadPool = Executors.newCacheThreadPool();

       c、建立一個只有一個執行緒的執行緒池  與單執行緒一樣  但好處是保證池子裡有一個執行緒,

當執行緒意外死亡,會自動產生一個替補執行緒,始終有一個執行緒存活

       ExecutorServicethreadPool = Executors.newSingleThreadExector();

往執行緒池中新增任務

       threadPool.executor(Runnable)

關閉執行緒池:

       threadPool.shutdown()   執行緒全部空閒,沒有任務就關閉執行緒池

       threadPool.shutdownNow()  不管任務有沒有做完,都關掉

</pre><pre name="code" class="java"><span style="white-space:pre">		</span>//ExecutorService threadPool = Executors.newFixedThreadPool(3);
		//ExecutorService threadPool = Executors.newCachedThreadPool();
		ExecutorService threadPool = Executors.newSingleThreadExecutor();
		for(int i=1;i<=10;i++){
			final int task = i;
			threadPool.execute(new Runnable(){
				@Override
				public void run() {
					for(int j=1;j<=10;j++){
						try {
							Thread.sleep(20);
						} catch (InterruptedException e) {
							// TODO Auto-generated catch block
							e.printStackTrace();
						}
						System.out.println(Thread.currentThread().getName() + " is looping of " + j + " for  task of " + task);
					}
				}
			});
		}
		System.out.println("all of 10 tasks have committed! ");
		//threadPool.shutdownNow();//不管完成沒有
<span style="white-space:pre">		</span>threadPool.shutdown();//完成之後



用執行緒池啟動定時器:

       a、建立排程執行緒池,提交任務         延遲指定時間後執行任務

       Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,時間單位);

       b、建立排程執行緒池,提交任務, 延遲指定時間執行任務後,間隔指定時間迴圈執行

       Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,

間隔時間,時間單位);

       所有的 schedule 方法都接受相對延遲和週期作為引數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。例如,要安排在某個以後的Date 執行,可以使用:schedule(task,date.getTime() - System.currentTimeMillis(),TimeUnit.MILLISECONDS)

              Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
                            newRunnable(){
                                   @Override
                            publicvoid run() {
                                   System.out.println("bombing!");
                                  
                            }},
                            6,
                            2,
                            TimeUnit.SECONDS);
       }


10.CallableFuture的應用:獲取一個執行緒的執行結果

public interface Callable<V>

返回結果並且可能丟擲異常的任務。實現者定義了一個不帶任何引數的叫做call 的方法。 Callable 介面類似於 Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是Runnable 不會返回結果,並且無法丟擲經過檢查的異常。

只有一個方法Vcall() 計算結果,如果無法計算結果,則丟擲一個Exception異常。

使用方法:

       ExecutorServicethreadPool = Executors.newSingleThreadExccutor();

       如果不需要返回結果,就用executor方法 呼叫submit方法返回一個Future物件

       Future<T> future = threadPool.submit(new Callable<T>(){//接收一個Callable介面的例項物件

覆蓋Callable介面中的call方法,丟擲異常

                     publicTcall() throws Exception

                     {

                            ruturnT

}

});

獲取Future接收的結果

future。get();會丟擲異常

future.get()沒有拿到結果就會一直等待

       Future取得的結果型別和Callable返回的結果型別必須一致,通過泛型實現。Callable要通過ExecutorService的submit方法提交,返回的Future物件可以取消任務。

public interface Future<V>

Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用Future 但又不提供可用的結果,則可以宣告Future<?> 形式型別、並返回 null 作為底層任務的結果。

方法摘要

 boolean

cancel(boolean mayInterruptIfRunning)           試圖取消對此任務的執行。

 V

get()           如有必要,等待計算完成,然後獲取其結果。

 V

get(long timeout,TimeUnit unit)           如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。

 boolean

isCancelled()           如果在任務正常完成前將其取消,則返回 true。

 boolean

isDone()           如果任務已完成,則返回 true。

public interface CompletionService<V>

       CompletionService用於提交一組Callable任務,其take方法返回一個已完成的Callable任務對應的Future物件。好比同時種幾塊麥子等待收割,收割時哪塊先熟先收哪塊。

將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者submit 執行的任務。使用者 take 已完成的任務,並按照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理非同步 IO ,執行讀操作的任務作為程式或系統的一部分提交,然後,當完成讀操作時,會在程式的不同部分執行其他操作,執行操作的順序可能與所請求的順序不同。

通常,CompletionService 依賴於一個單獨的 Executor 來實際執行任務,在這種情況下,CompletionService 只管理一個內部完成佇列。ExecutorCompletionService 類提供了此方法的一個實現。

CompletionService方法摘要

poll()           獲取並移除表示下一個已完成任務的 Future,如果不存在這樣的任務,則返回 null。

poll(long timeout,TimeUnit unit)          獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則將等待指定的時間(如果有必要)。

submit(Callable<V> task)           提交要執行的值返回任務,並返回表示掛起的任務結果的 Future。

submit(Runnable task,V result)           提交要執行的 Runnable 任務,並返回一個表示任務完成的 Future,可以提取或輪詢此任務。

take()           獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。

ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue)
          使用為執行基本任務而提供的執行程式建立一個 ExecutorCompletionService,並將所提供的佇列作為其完成佇列。

示例:

ExecutorService threadPool = Executors.newFixedThreadPool(10);   //建立執行緒池,傳遞給coms

       用threadPool執行任務,執行的任務返回結果都是整數

CompletionService<Integer> coms = newExecutorCompletionService<Integer>(threadPool);

       提交10個任務  種麥子

for (int i=0; i<10; i++)

{

       finalint num = i+1;

coms.submit(newCallable<Integer>(){

public Integercall()       覆蓋call方法

{匿名內部類使用外部變數要用final修飾

       SOP(任務+num);

       Thread.sleep(newRandom().nextInt(6)*1000);

       return num;

}

});

}

       等待收穫       割麥子

for (int i=0; i<10; i++)

{     take獲取第一個Future物件,用get獲取結果

       SOP(coms.take().get());

}

public class CallableAndFuture {

       /**

        * @param args

        */

       publicstatic void main(String[] args) {

              ExecutorServicethreadPool = Executors.newSingleThreadExecutor();

              Future<String>future =

                     threadPool.submit(

                            newCallable<String>() {

                                   publicString call() throws Exception {

                                          Thread.sleep(2000);

                                          return"hello";

                                   };

                            }

              );

              System.out.println("等待結果");

              try{

                     System.out.println("拿到結果:" + future.get());

              }catch (InterruptedException e) {

                     //TODO Auto-generated catch block

                     e.printStackTrace();

              }catch (Exception e) {

                     //TODO Auto-generated catch block

                     e.printStackTrace();

              }

              ExecutorServicethreadPool2 = Executors.newFixedThreadPool(10);

              CompletionService<Integer>completionService = new ExecutorCompletionService<Integer>(threadPool2);

              for(inti=1;i<=10;i++){

                     finalint seq = i;

                     completionService.submit(newCallable<Integer>() {

                            @Override

                            publicInteger call() throws Exception {

                                   Thread.sleep(newRandom().nextInt(5000));

                                   returnseq;

                            }

                     });

              }

              for(inti=0;i<10;i++){

                     try{

                            System.out.println(

                                          completionService.take().get());

                     }catch (InterruptedException e) {

                            //TODO Auto-generated catch block

                            e.printStackTrace();

                     }catch (ExecutionException e) {

                            //TODO Auto-generated catch block

                            e.printStackTrace();

                     }

              }

       }

}

public class CallableAndFuture {

	/**
	 * @param args
	 */
	public static void main(String[] args) {
		ExecutorService threadPool =  Executors.newSingleThreadExecutor();
		Future<String> future =
			threadPool.submit(
				new Callable<String>() {
					public String call() throws Exception {
						Thread.sleep(2000);
						return "hello";
					};
				}
		);
		System.out.println("等待結果");
		try {
			System.out.println("拿到結果:" + future.get());
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		
		ExecutorService threadPool2 =  Executors.newFixedThreadPool(10);
		CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
		for(int i=1;i<=10;i++){
			final int seq = i;
			completionService.submit(new Callable<Integer>() {
				@Override
				public Integer call() throws Exception {
					Thread.sleep(new Random().nextInt(5000));
					return seq;
				}
			});
		}
		for(int i=0;i<10;i++){
			try {
				System.out.println(
						completionService.take().get());
			} catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			} catch (ExecutionException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	

}