1. 程式人生 > >Java多執行緒程式設計---併發框架Executor

Java多執行緒程式設計---併發框架Executor

        我們都知道,在JDK1.5之前,Java中要進行業務併發時,通常需要有程式設計師獨立完成程式碼實現,而當針對高質量Java多執行緒併發程式設計時,為防止死鎖等現象的出現,比如使用java之前的wait()、notify()和synchronized等,每每需要考慮效能、死鎖、公平性、資源管理以及如何避免執行緒安全性方面帶來的危害等諸多因素,往往會採用一些較為複雜的安全策略,加重了程式設計師的開發負擔。

        萬幸的是,在 Java 5.0 提供了java.util.concurrent(簡稱JUC )包,在此包中增加了在併發程式設計中很常用的實用工具類,用於定義類似於執行緒的自定義子系統,包括執行緒池、非同步 IO 和輕量級任務框架。提供可調的、靈活的執行緒池。還提供了設計用於多執行緒上下文中的 Collection 實現等。開發者們藉助於此,將有效的減少競爭條件(race conditions)和死鎖執行緒等。concurrent包很好的解決了這些問題,為我們提供了更實用的併發程式模型。

JUC包簡介

        java.util.concurrent下主要的介面和類:

        1、Executor:具體Runnable任務的執行者。

      2、ExecutorService:一個執行緒池管理者,其實現類有多種,比如普通執行緒池,定時排程執行緒池ScheduledExecutorService等,我們能把一個Runnable,Callable提交到池中讓其排程。

        3、Callable:返回結果並且可能丟擲異常的任務。實現者定義了一個不帶任何引數的叫做 call 的方法。

        4、Future:是與Runnable,Callable進行互動的介面,比如一個執行緒執行結束後取返回的結果等等,還提供了cancel終止執行緒。

        5、BlockingQueue:阻塞佇列。

Runnable與Callable

        public interface Runnable

        Runnable 介面應該由那些打算通過某一執行緒執行其例項的類來實現。類必須定義一個稱為 run 的無引數方法。

        設計該介面的目的是為希望在活動時執行程式碼的物件提供一個公共協議。例如,Thread 類實現了 Runnable。start的意思是說某個執行緒已啟動並且尚未停止。

        此外,Runnable 為非 Thread 子類的類提供了一種啟用方式。通過例項化某個 Thread 例項並將自身作為執行目標,就可以執行實現

Runnable 的類而無需建立 Thread 的子類。大多數情況下,如果只想重寫 run() 方法,而不重寫其他 Thread 方法,那麼應使用 Runnable 介面。這很重要,因為除非程式設計師打算修改或增強類的基本行為,否則不應為該類建立子類。

        方法:void run(),使用實現介面 Runnable 的物件建立一個執行緒時,啟動該執行緒將導致在獨立執行的執行緒中呼叫物件的 run 方法。方法 run 的常規協定是,它可能執行任何所需的動作。

        public interface Callable<V>

        返回結果並且可能丟擲異常的任務。實現者定義了一個不帶任何引數的叫做 call 的方法。

        Callable 介面類似於 Runnable,兩者都是為那些其例項可能被另一個執行緒執行的類設計的。但是 Runnable 不會返回結果,並且無法丟擲經過檢查的異常。

Executors 類包含一些從其他普通形式轉換成 Callable 類的實用方法。

         方法:V call() throws Exception,計算結果,如果無法計算結果,則丟擲一個異常。返回:計算的結果。

Future

        public interface Future<V>

        Future 表示非同步計算的結果。它提供了檢查計算是否完成的方法,以等待計算的完成,並獲取計算的結果。計算完成後只能使用 get 方法來獲取結果,如有必要,計算完成前可以阻塞此方法。取消則由 cancel 方法來執行。還提供了其他方法,以確定任務是正常完成還是被取消了。一旦計算完成,就不能再取消計算。如果為了可取消性而使用 Future 但又不提供可用的結果,則可以宣告Future<?> 形式型別、並返回 null 作為底層任務的結果。

        方法摘要

        1、boolean cancel(booleanmayInterruptIfRunning):試圖取消對此任務的執行。

        2、V get():如有必要,等待計算完成,然後獲取其結果。

        3、V get(longtimeout, TimeUnit unit):如有必要,最多等待為使計算完成所給定的時間之後,獲取其結果(如果結果可用)。

        4、booleanisCancelled():如果在任務正常完成前將其取消,則返回 true。

        5、booleanisDone():如果任務已完成,則返回 true。

Executor

        public interface Executor

        執行已提交的 Runnable 任務的物件。此介面提供一種將任務提交與每個任務將如何執行的機制(包括執行緒使用的細節、排程等)分離開來的方法。通常使用 Executor 而不是顯式地建立執行緒。例如,可能會使用以下方法,而不是為一組任務中的每個任務呼叫 newThread(new(RunnableTask())).start():

        Executorexecutor = anExecutor;

        executor.execute(newRunnableTask1());

        executor.execute(newRunnableTask2());

        ...

        不過,Executor 介面並沒有嚴格地要求執行是非同步的。在最簡單的情況下,執行程式可以在呼叫者的執行緒中立即執行已提交的任務:

class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
        更常見的是,任務是在某個不是呼叫者執行緒的執行緒中執行的。以下執行程式將為每個任務生成一個新執行緒。
class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}

        許多 Executor 實現都對排程任務的方式和時間強加了某種限制。以下執行程式使任務提交與第二個執行程式保持連續,這說明了一個複合執行程式。

class SerialExecutor implements Executor {
    final Queue<Runnable> tasks = new ArrayDeque<Runnable>();
    final Executor executor;
    Runnable active;

    SerialExecutor(Executor executor) {
        this.executor = executor;
    }

    public synchronized void execute(final Runnable r) {
        tasks.offer(new Runnable() {
            public void run() {
                try {
                     r.run();
                } finally {
                     scheduleNext();
                }
             }
         });
         if (active == null) {
             scheduleNext();
         }
     }

     protected synchronized void scheduleNext() {
         if ((active = tasks.poll()) != null) {
             executor.execute(active);
         }
     }
}

        java.util.concurrent包中提供的 Executor 實現實現了ExecutorService,這是一個使用更廣泛的介面。ThreadPoolExecutor 類提供一個可擴充套件的執行緒池實現。Executors 類為這些 Executor 提供了便捷的工廠方法。

        方法:void execute(Runnable command)。在未來某個時間執行給定的命令。該命令可能在新的執行緒、已入池的執行緒或者正呼叫的執行緒中執行,這由 Executor 實現決定。引數:command - 可執行的任務。

ExecutorService

        public interface ExecutorService extendsExecutor

        Executor 提供了管理終止的方法,以及可為跟蹤一個或多個非同步任務執行狀況而生成 Future 的方法。

        可以關閉 ExecutorService,這將導致其拒絕新任務。提供兩個方法來關閉ExecutorService。shutdown()方法在終止前允許執行以前提交的任務,而 shutdownNow()方法阻止等待任務啟動並試圖停止當前正在執行的任務。在終止時,執行程式沒有任務在執行,也沒有任務在等待執行,並且無法提交新任務。應該關閉未使用的 ExecutorService 以允許回收其資源。

    通過建立並返回一個可用於取消執行和/或等待完成的 Future,方法 submit 擴充套件了基本方法Executor.execute(java.lang.Runnable)。方法 invokeAny 和 invokeAll是批量執行的最常用形式,它們執行任務collection,然後等待至少一個,或全部任務完成(可使用 ExecutorCompletionService 類來編寫這些方法的自定義變體)。

        Executors 類提供了用於此包中所提供的執行程式服務的工廠方法。

Executors

        public class Executors extends Object

        此包中所定義的 Executor、ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 類的工廠和實用方法。此類支援以下各種方法:

        1、建立並返回設定有常用配置字串的ExecutorService 的方法。

        2、建立並返回設定有常用配置字串的ScheduledExecutorService 的方法。

        3、建立並返回“包裝的”ExecutorService方法,它通過使特定於實現的方法不可訪問來禁用重新配置。

        4、建立並返回ThreadFactory 的方法,它可將新建立的執行緒設定為已知的狀態。

        5、建立並返回非閉包形式的Callable 的方法,這樣可將其用於需要 Callable 的執行方法中。

建立執行緒池

        如果沒有執行緒池,需要在run方法中不停判斷,還有沒有任務需要執行。

        執行緒池的通俗比喻:接待客戶,為每個客戶都安排一個工作人員,接待完成後該工作人員就廢掉。伺服器每收到一個客戶請求就為其分配一個執行緒提供服務,服務結束後銷燬執行緒,不斷建立、銷燬執行緒,影響效能。

        執行緒池:先建立多個執行緒放線上程池中,當有任務需要執行時,從執行緒池中找一個空閒執行緒執行任務,任務完成後,並不銷燬執行緒,而是返回執行緒池,等待新的任務安排。

        執行緒池程式設計中,任務是提交給整個執行緒池的,並不是提交給某個具體的執行緒,而是由執行緒池從中挑選一個空閒執行緒來執行任務。一個執行緒同時只能執行一個任務,可以同時向一個執行緒池提交多個任務。

        1、執行緒池建立方法:

        a、建立一個擁有固定執行緒數的執行緒池

        ExecutorService threadPool =Executors.newFixedThreadPool(3);

        b、建立一個快取執行緒池,執行緒池中的執行緒數根據任務多少自動增刪,動態變化

        ExecutorService threadPool =Executors.newCacheThreadPool();

        c、建立一個只有一個執行緒的執行緒池,與單執行緒一樣,但好處是保證池子裡有一個執行緒,當執行緒意外死亡,會自動產生一個替補執行緒,始終有一個執行緒存活

        ExecutorService threadPool =Executors.newSingleThreadExector();

        2、往執行緒池中新增任務:

        threadPool.executor(Runnable)

        3、關閉執行緒池:

        threadPool.shutdown():執行緒全部空閒,沒有任務就關閉執行緒池

        threadPool.shutdownNow():不管任務有沒有做完,都關掉

        4、用執行緒池啟動定時器:

        a、建立排程執行緒池,提交任務,延遲指定時間後執行任務

        Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,時間單位);

        b、建立排程執行緒池,提交任務,延遲指定時間執行任務後,間隔指定時間迴圈執行

        Executors.newScheduledThreadPool(執行緒數).schedule(Runnable, 延遲時間,間隔時間, 時間單位);

        所有的 schedule 方法都接受相對延遲和週期作為引數,而不是絕對的時間或日期。將以 Date 所表示的絕對時間轉換成要求的形式很容易。例如,要安排在某個以後的 Date執行,可以使用:schedule(task,date.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS)。

ExecutorService生命週期

        ExecutorService擴充套件了Executor並添加了一些生命週期管理的方法。一個Executor的生命週期有三種狀態,執行,關閉,終止。Executor建立時處於執行狀態。當呼叫ExecutorService.shutdown()後,處於關閉狀態,isShutdown()方法返回true。這時,不應該再想Executor中新增任務,所有已新增的任務執行完畢後,Executor處於終止狀態,isTerminated()返回true。如果Executor處於關閉狀態,往Executor提交任務會丟擲unchecked exception RejectedExecutionException。

執行緒池例項

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import org.junit.Test;

/**
 * 
 * @Description: 建立執行緒池的集中方法
 *
 * @author: zxt
 *
 * @time: 2018年4月8日 下午7:44:28
 *
 */
public class ThreadPoolTest {

	public static void main(String[] args) {
		// 延遲delay時間後執行
		Executors.newScheduledThreadPool(3).schedule(
				new Runnable() {
					@Override
					public void run() {
						System.out.println(Thread.currentThread().getName() + " bombing!!");
					}
				}, 
				3, 
				TimeUnit.SECONDS);
		
		// 固定頻率的定時器
		Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
				new Runnable() {
					@Override
					public void run() {
						System.out.println(Thread.currentThread().getName() + " bombing!!");
					}
				}, 
				4, 
				2,
				TimeUnit.SECONDS);
	}
	
	/**
	 * 
	 * @Description:固定大小的執行緒池
	 *
	 */
	@Test
	public void fixedThreadPoolTest() {
		// 建立一個固定大小的執行緒池
		ExecutorService threadPool = Executors.newFixedThreadPool(3);
		// 往執行緒池中新增10個任務
		for (int i = 1; i <= 10; i++) {
			final int task = i;
			threadPool.execute(new Runnable() {

				@Override
				public void run() {
					for (int j = 1; j <= 5; j++) {
						System.out.println(
								Thread.currentThread().getName() + " is looping of " + j + " for task " + task);
					}

				}
			});
		}
		
		// 當執行緒池中的所有執行緒都沒有任務在執行時,關閉執行緒池
		threadPool.shutdown();
	}
	
	/**
	 * 
	 * @Description:可變大小的執行緒池
	 *
	 */
	@Test
	public void cachedThreadPoolTest() {
		// 建立一個可變大小的執行緒池
		ExecutorService threadPool = Executors.newCachedThreadPool();
		// 往執行緒池中新增10個任務
		for (int i = 1; i <= 10; i++) {
			final int task = i;
			threadPool.execute(new Runnable() {

				@Override
				public void run() {
					for (int j = 1; j <= 5; j++) {
						System.out.println(
								Thread.currentThread().getName() + " is looping of " + j + " for task " + task);
					}

				}
			});
		}
		
		// 當執行緒池中的所有執行緒都沒有任務在執行時,關閉執行緒池
		threadPool.shutdown();
	}
	
	/**
	 * 
	 * @Description:單執行緒執行緒池(和單執行緒的區別在於,池中始終能保持有一個執行緒在)
	 *
	 */
	@Test
	public void singleThreadExecutorTest() {
		// 建立一個可變大小的執行緒池
		ExecutorService threadPool = Executors.newSingleThreadExecutor();
		// 往執行緒池中新增10個任務
		for (int i = 1; i <= 10; i++) {
			final int task = i;
			threadPool.execute(new Runnable() {

				@Override
				public void run() {
					for (int j = 1; j <= 5; j++) {
						System.out.println(
								Thread.currentThread().getName() + " is looping of " + j + " for task " + task);
					}

				}
			});
		}
		
		// 當執行緒池中的所有執行緒都沒有任務在執行時,關閉執行緒池
		threadPool.shutdown();
	}
}

Callable與Future的應用

        獲取一個執行緒的執行結果。

        ExecutorService threadPool =Executors.newSingleThreadExccutor();  如果不需要返回結果,就用executor方法,呼叫submit方法返回一個Future物件。

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class CallableAndFuture {

	public static void main(String[] args) {
		ExecutorService threadPool = Executors.newSingleThreadExecutor();
		// 接收一個Callable介面的例項物件, 返回一個Future物件
		Future<String> future = threadPool.submit(new Callable<String>() {
			
			// 覆蓋Callable介面中的call方法,丟擲異常
			@Override
			public String call() throws Exception {
				Thread.sleep(2000);
				return "hello!";
			}
		});

		System.out.println("等待結果");
		try {
			System.out.println("拿到結果:" + future.get());

		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

        獲取Future接收的結果:future.get();會丟擲異常, future.get()沒有拿到結果就會一直等待。Future取得的結果型別和Callable返回的結果型別必須一致,通過泛型實現。Callable要通過ExecutorService的submit方法提交,返回的Future物件可以取消任務。

例子:平行計算陣列的和。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
 * 
 * @Description: 平行計算陣列的和
 *
 * @author: zxt
 *
 * @time: 2018年4月8日 下午8:19:28
 *
 */
public class ConcurrentCalculator {
	
	private ExecutorService exec;
	private int cpuCoreNumber;
	private List<Future<Long>> tasks = new ArrayList<Future<Long>>();
	
	public ConcurrentCalculator() {
		// 得到cpu核心的個數
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		// 有幾個cpu,則建立有幾個執行緒的執行緒池
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
	}
	
	public void close() {
        exec.shutdown();
    }
	
	public static void main(String[] args) {
		int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
		ConcurrentCalculator calc = new ConcurrentCalculator();
		Long sum = calc.sum(numbers);
		System.out.println(sum);
		calc.close();
	}

	// 內部類,實現Callable介面,將其例項提交給Executor可執行
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			
			return sum;
		}
	}


	public Long sum(final int[] numbers) {
		// 根據CPU核心個數拆分任務,建立FutureTask並提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			// 將陣列分成多端,使用多個任務計算
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length) {
				end = numbers.length;
			}
			
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			// FutureTask<V>實現了Future<V>和Runable<V>
			FutureTask<Long> task = new FutureTask<Long>(subCalc);
			tasks.add(task);
			if (!exec.isShutdown()) {
				// 因為 FutureTask 實現了 Runnable,所以可將 FutureTask 提交給 Executor 執行。
				exec.submit(task);
			}
		}
		
		return getResult();
	}

	/**
	 * 迭代每個只任務,獲得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (Future<Long> task : tasks) {
			try {
				// 如果計算未完成則阻塞  (Future中儲存的是Callable的執行結果,可以使用get得到)
				Long subSum = task.get();
				result += subSum;
				
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		return result;
	}
}

CompletionService

        public interfaceCompletionService<V>

        CompletionService用於提交一組Callable任務,其take方法返回一個已完成的Callable任務對應的Future物件。好比同時種幾塊麥子等待收割,收割時哪塊先熟先收哪塊。

        將生產新的非同步任務與使用已完成任務的結果分離開來的服務。生產者 submit 執行的任務。使用者 take 已完成的任務,並按照完成這些任務的順序處理它們的結果。例如,CompletionService 可以用來管理非同步 IO ,執行讀操作的任務作為程式或系統的一部分提交,然後當完成讀操作時,會在程式的不同部分執行其他操作,執行操作的順序可能與所請求的順序不同。

        通常,CompletionService 依賴於一個單獨的Executor 來實際執行任務,在這種情況下,CompletionService 只管理一個內部完成佇列。ExecutorCompletionService 類提供了此介面的一個實現。

CompletionService介面方法摘要

        1、Future<V>poll():獲取並移除表示下一個已完成任務的 Future,如果不存在這樣的任務,則返回null。

        2、Future<V>poll(long timeout, TimeUnit unit):獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則將等待指定的時間(如果有必要)。

        3Future<V> submit(Callable<V> task):提交要執行的值返回任務,並返回表示掛起的任務結果的 Future

        4、Future<V>submit(Runnable task, V result):提交要執行的 Runnable 任務,並返回一個表示任務完成的 Future,可以提取或輪詢此任務。

        5Future<V> take():獲取並移除表示下一個已完成任務的 Future,如果目前不存在這樣的任務,則等待。

ExecutorCompletionService方法摘要

    1、ExecutorCompletionService(Executorexecutor):使用為執行基本任務而提供的執行程式建立一個 ExecutorCompletionService,並將 LinkedBlockingQueue 作為完成佇列。

        2、ExecutorCompletionService(Executor executor,BlockingQueue<Future<V>> completionQueue):使用為執行基本任務而提供的執行程式建立一個 ExecutorCompletionService,並將所提供的佇列作為其完成佇列。

示例:

// CompletionService使用例項:按照任務完成的順序處理它們的結果
ExecutorService threadPool2 = Executors.newFixedThreadPool(10);
CompletionService<Integer> completionService = new ExecutorCompletionService<Integer>(threadPool2);
// 提交10個任務
for(int i = 1; i <= 10; i++) {
	final int seq = i;
	completionService.submit(new Callable<Integer>() {

		@Override
		public Integer call() throws Exception {
			Thread.sleep(new Random().nextInt(5000));
				return seq;
			}
		});
	}

	// 獲取任務的返回結果
	for(int i = 1; i <= 10; i++) {
		try {
			System.out.println(completionService.take().get());

		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
	}
}

例子:平行計算陣列的和(改寫上述方法)。

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 
 * @Description: 平行計算陣列的和
 *
 * @author: zxt
 *
 * @time: 2018年4月8日 下午8:19:28
 *
 */
public class ConcurrentCalculator2 {
	
	private ExecutorService exec;
	private int cpuCoreNumber;
	private CompletionService<Long> completionService;
	
	public ConcurrentCalculator2() {
		// 得到cpu核心的個數
		cpuCoreNumber = Runtime.getRuntime().availableProcessors();
		// 有幾個cpu,則建立有幾個執行緒的執行緒池
		exec = Executors.newFixedThreadPool(cpuCoreNumber);
		// 按照任務完成的順序處理它們的結果
		completionService = new ExecutorCompletionService<Long>(exec);
	}
	
	public void close() {
        exec.shutdown();
    }
	
	public static void main(String[] args) {
		int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 };
		ConcurrentCalculator2 calc = new ConcurrentCalculator2();
		Long sum = calc.sum(numbers);
		System.out.println(sum);
		calc.close();
	}

	// 內部類,實現Callable介面,將其例項提交給Executor可執行
	class SumCalculator implements Callable<Long> {
		private int[] numbers;
		private int start;
		private int end;

		public SumCalculator(final int[] numbers, int start, int end) {
			this.numbers = numbers;
			this.start = start;
			this.end = end;
		}

		public Long call() throws Exception {
			Long sum = 0l;
			for (int i = start; i < end; i++) {
				sum += numbers[i];
			}
			
			return sum;
		}
	}


	public Long sum(final int[] numbers) {
		// 根據CPU核心個數拆分任務,建立FutureTask並提交到Executor
		for (int i = 0; i < cpuCoreNumber; i++) {
			// 將陣列分成多端,使用多個任務計算
			int increment = numbers.length / cpuCoreNumber + 1;
			int start = increment * i;
			int end = increment * i + increment;
			if (end > numbers.length) {
				end = numbers.length;
			}
			
			SumCalculator subCalc = new SumCalculator(numbers, start, end);
			if (!exec.isShutdown()) {
				// 通過CompletionService服務提交Callable任務執行
				completionService.submit(subCalc);
			}
		}
		
		return getResult();
	}

	/**
	 * 迭代每個只任務,獲得部分和,相加返回
	 * 
	 * @return
	 */
	public Long getResult() {
		Long result = 0l;
		for (int i = 0; i < cpuCoreNumber; i++) {
			try {
				// 按任務完成的順序得到返回結果
				Long subSum = completionService.take().get();
				result += subSum;
				
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
		return result;
	}
}