1. 程式人生 > >定製在排程執行緒池中執行的任務

定製在排程執行緒池中執行的任務

Java 9併發程式設計指南 目錄

定製在排程執行緒池中執行的任務

排程執行緒池是Executor框架基本執行緒池的擴充套件,排程在一段時間後執行任務。ScheduledThreadPoolExecutor類實現此執行緒池,並且允許如下兩種任務的執行:

  • **延遲任務:**在一段時間後只執行一次的任務
  • **週期任務:**在延遲之後且持續週期性執行的任務

延遲任務能夠執行Callable和Runnable物件,但週期任務只能執行Runnable物件。所有通過排程執行緒池執行的任務都是RunnableScheduledFuture介面實現。本節將學習如何實現自定義RunnableScheduledFuture介面,來執行延遲和週期任務。

準備工作

本範例通過Eclipse開發工具實現。如果使用諸如NetBeans的開發工具,開啟並建立一個新的Java專案。

實現過程

通過如下步驟實現範例:

  1. 建立名為MyScheduledTask的類,此類由名為V的泛型型別引數化,繼承FutureTask類且實現RunnableScheduledFuture介面:

    public class MyScheduledTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
    
  2. 宣告名為task的私有RunnableScheduledFuture屬性:

    	private RunnableScheduledFuture<V> task;
    
  3. 宣告名為executor的私有ScheduledThreadPoolExecutor類:

    	private ScheduledThreadPoolExecutor executor;
    
  4. 宣告名為period的私有long屬性:

    	private long period;
    
  5. 宣告名為startDate的私有long屬性:

    	private long startDate;
    
  6. 實現類建構函式,接收任務將要執行的Runnable物件,任務返回的結果,建立MyScheduledTask物件的RunnableScheduledFuture任務,和將要執行任務的ScheduledThreadPoolExecutor物件。呼叫其父類建構函式,並存儲task和executor屬性:

    	public MyScheduledTask(Runnable runnable, V result, RunnableScheduledFuture<V> task, ScheduledThreadPoolExecutor executor) {
    		super(runnable, result);
    		this.task=task;
    		this.executor=executor;
    	}
    
  7. 實現getDelay()方法,如果任務是週期的且startDate屬性值大於零,計算實際時間和startDate的差值作為返回值。否則返回task屬性中儲存的初始任務延遲時間。切記必須在作為引數傳遞的時間單元中返回結果:

    	@Override
    	public long getDelay(TimeUnit unit) {
    		if (!isPeriodic()) {
    			return task.getDelay(unit);
    		} else {
    			if (startDate==0){
    				return task.getDelay(unit);
    			} else {
    				Date now=new Date();
    				long delay=startDate-now.getTime();
    				return unit.convert(delay, TimeUnit.MILLISECONDS);
    			}
    		}
    	}
    
  8. 實現compareTo()方法,呼叫初始任務的compareTo()方法:

    	@Override
    	public int compareTo(Delayed o) {
    		return task.compareTo(o);
    	}
    
  9. 實現isPerodic()方法,呼叫初始任務的isPerodic()方法:

    	@Override
    	public boolean isPeriodic() {
    		return task.isPeriodic();
    	}
    
  10. 實現run()方法,如果是週期任務,則必須使用任務下次執行的開始時間更新startDate屬性,屬性值為實際時間和週期的總和。然後再次把任務新增到ScheduledThreadPoolExecutor物件佇列中:

    	@Override
    	public void run() {
    		if (isPeriodic() && (!executor.isShutdown())) {
    			Date now=new Date();
    			startDate=now.getTime()+period;
    			executor.getQueue().add(this);
    		}
    
  11. 輸出實際時間到控制檯,呼叫runAndReset()方法執行此任務,然後輸出再次執行的實際時間到控制檯:

    		System.out.printf("Pre-MyScheduledTask: %s\n",new Date());
    		System.out.printf("MyScheduledTask: Is Periodic: %s\n",
    		isPeriodic());
    		super.runAndReset();
    		System.out.printf("Post-MyScheduledTask: %s\n",new Date());
    	}
    
  12. 實現setPeriod()方法設定任務週期:

    	public void setPeriod(long period) {
    		this.period=period;
    	}
    }
    
  13. 建立名為MyScheduledThreadPoolExecutor的類,實現執行MyScheduledTask任務的ScheduledThreadPoolExecutor物件。指定此類繼承ScheduledThreadPoolExecutor類:

    public class MyScheduledThreadPoolExecutor extends ScheduledThreadPoolExecutor{
    
  14. 實現只調用父類建構函式的類建構函式:

    	public MyScheduledThreadPoolExecutor(int corePoolSize) {
    		super(corePoolSize);
    	}
    
  15. 實現decorateTask()方法,將待執行的Runnable物件和執行此物件的RunnableScheduledFuture任務作為引數接收,建立並返回MyScheduledTask任務,使用這些物件來構造它們:

    	@Override
    	protected <V> RunnableScheduledFuture<V> decorateTask(Runnable runnable, RunnableScheduledFuture<V> task) {
    		MyScheduledTask<V> myTask=new MyScheduledTask<V>(runnable, null, task,this);
    		return myTask;
    	}
    
  16. 重寫scheduledAtFixedRate()方法,呼叫其父類方法,將返回物件轉換成MyScheduledTask物件,使用setPeriod()方法設定任務的週期:

    	@Override
    	public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
    		ScheduledFuture<?> task= super.scheduleAtFixedRate(command, initialDelay, period, unit);
    		MyScheduledTask<?> myTask=(MyScheduledTask<?>)task;
    		myTask.setPeriod(TimeUnit.MILLISECONDS.convert(period,unit));
    		return task;
    	}
    }
    
  17. 建立名為Task的類,實現Runnable介面:

    public class Task implements Runnable {
    
  18. 實現run()方法,輸出任務啟動資訊,設定當前執行緒休眠2秒鐘,然後輸出任務結束資訊:

    	@Override
    	public void run() {
    	System.out.printf("Task: Begin.\n");
    		try {
    			TimeUnit.SECONDS.sleep(2);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		System.out.printf("Task: End.\n");
    	}
    }
    
  19. 通過建立名為Main的類,新增main()方法,實現本範例主類:

    public class Main {
    	public static void main(String[] args)  throws Exception{
    
  20. 建立名為executor的MyScheduledThreadPoolExecutor物件,使用4作為引數,池中包含兩個執行緒:

    		MyScheduledThreadPoolExecutor executor=new MyScheduledThreadPoolExecutor(4);
    
  21. 建立名為task的Task物件,輸出實際時間到控制檯:

    		Task task=new Task();
    		System.out.printf("Main: %s\n",new Date());
    
  22. 使用schedule()方法傳送延遲任務到執行器,此任務將延遲1秒後執行:

    		executor.schedule(task, 1, TimeUnit.SECONDS);
    
  23. 設定主執行緒休眠3秒鐘:

    		TimeUnit.SECONDS.sleep(3);
    
  24. 建立另一個Task物件,再次輸出實際時間到控制檯:

    		task=new Task();
    		System.out.printf("Main: %s\n",new Date());
    
  25. 使用scheduleAtFixedRate()方法傳送週期任務到執行器,此任務將延遲1秒後執行,且每3秒鐘執行一次:

    		executor.scheduleAtFixedRate(task, 1, 3, TimeUnit.SECONDS);
    
  26. 設定主執行緒休眠10秒鐘:

    		TimeUnit.SECONDS.sleep(10);
    
  27. 使用shutdown()方法關閉執行器,使用awaitTermination()方法等待執行器結束:

    		executor.shutdown();
    		executor.awaitTermination(1, TimeUnit.DAYS);
    
  28. 輸出指明程式結束的資訊到控制檯:

    		System.out.printf("Main: End of the program.\n");
    	}
    }
    

工作原理

在本節中,實現了MyScheduledTask類,用來實現在ScheduledThreadPoolExecutor執行器上執行的定製化任務。此類繼承FutureTask類和實現RunnableScheduledFuture介面。它實現RunnableScheduledFuture介面是因為所有在排程執行器中執行的任務必須實現此介面且繼承FutureTask類。這是因為此類正確地實現了RunnableScheduledFuture介面中宣告的方法。之前提到的所有介面和類都是引數化類,它們具有通過任務返回的資料型別。

為了在排程執行器中使用MyScheduledTask任務,重寫了MyScheduledThreadPoolExecutor類的decorateTask()方法。此類繼承ScheduledThreadPoolExecutor執行器,並且此方法將ScheduledThreadPoolExecutor執行器實現的預設排程任務轉換成MyScheduledTask任務。所以,當實現自定義的排程任務時,還需要實現自定義排程執行器。

decorateTask()方法建立新的MyScheduledTask物件,包含四個引數。第一個引數是任務中待執行的Runnable物件;第二個引數是通過任務待返回的物件;第三個引數是執行緒池中待替換的新物件;最後一個物件是將要執行任務的執行器。本範例中,使用this關鍵字來引用正在建立任務的執行器。

MyScheduledTask類可以執行延遲和週期任務,通過實現getDelay()和run()方法,包含必要的邏輯操作來執行這兩種任務。

排程執行器通過呼叫getDelay()方法知道是否需要執行任務,此方法特性在延遲和週期任務中改變。如前所述,MyScheduledClass類建構函式接收準備執行 Runnable物件的初始ScheduledRunnableFuture物件,且儲存此物件為類屬性,用來訪問類方法和資料。當執行延遲任務時,getDelay()方法返回初始任務的延遲時間。但處理週期任務時,getDelay()方法返回任務執行的實際時間和startDate的差值。

run()方法是執行任務的方法,週期任務特點是,如果希望再次執行任務,則必須將下一個執行任務作為新任務放在執行器的佇列中。所以如果執行週期任務,則設定startDate屬性值為任務執行的實際時間和區間,並將任務再次儲存在執行器的佇列中。startDate屬性儲存下一個任務開始執行時間,然後使用FutureTask類提供的runAndReset()方法執行任務。處理延遲任務不需要將任務放置到執行器佇列中,因為延遲任務只執行一次。

還需要注意執行器是否已經關閉。如果是,則不需要再次將週期任務儲存到執行器佇列中。

最後,重寫MyScheduledThreadPoolExecutor類的scheduleAtFixedRate()方法。我們之前提到對於週期任務,使用任務週期設定startDate屬性值,但是我們還沒有初始化週期。所以需要重寫此方法,將週期作為引數接收,將週期傳給MyScheduledTask類才能使用。

本範例完成了實現Runnable介面的任務類,它是在排程執行器中執行的任務。範例主類建立MyScheduledThreadPoolExecutor執行器,且返回如下兩個任務:

  • 延遲任務,在實際時間1秒後執行
  • 週期認為,在實際時間1秒後執行,然後每個3秒執行一次

下圖顯示本範例在控制檯輸出的執行資訊,可以檢查是否正確執行這兩種任務:

pics/08_04.jpg

擴充套件學習

ScheduledThreadPoolExecutor類提供decorateTask()方法的另一個版本,此方法將Callable物件代替Runnable物件作為引數接收。

更多關注

  • 第四章“執行緒執行器”中的“執行器中延遲執行任務”和“執行器中週期執行任務”小節