1. 程式人生 > >Java高併發程式設計——為IO密集型應用設計執行緒數與劃分任務

Java高併發程式設計——為IO密集型應用設計執行緒數與劃分任務

文章轉自:http://www.tuicool.com/articles/fA7rMn

實際工作中的三類程式適用於以併發的形式來提速:

1. 服務程式:同時響應多個使用者請求

2. 計算密集型程式:併發計算,將問題拆分為子任務、併發執行各子任務並最終將子任務的結果彙總合併。

3. IO密集型程式(阻塞型):常需要阻塞等待的程式,比如說因為網路環境阻塞等待,因為IO讀取阻塞等待。當一個任務阻塞在IO操作上時,我們可以立即切換執行其他任務或啟動其他IO操作請求,這樣併發就可以幫助我們有效地提升程式執行效率。

對於IO密集型程式,我們要用併發來獲得執行效率的大幅提升時,首先要思考兩個問題:如何估計需要建立多少個執行緒以及如何分解問題。這裡也涉及到如何估算併發帶來的效能提升的程度。

1.  確定執行緒數

確定執行緒數首先需要考慮到系統可用的處理器核心數:

Runtime.getRuntime().availableProcessors();

應用程式最小執行緒數應該等於可用的處理器核數。如果所有的任務都是計算密集型的,則建立處理器可用核心數這麼多個執行緒就可以了,這樣已經充分利用了處理器,也就是讓它以最大火力不停進行計算。建立更多的執行緒對於程式效能反而是不利的,因為多個執行緒間頻繁進行上下文切換對於程式效能損耗較大。

        但如果任務都是IO密集型的,那我們就需要建立比處理器核心數大幾倍數量的執行緒。為何?當一個任務執行IO操作時,執行緒將被阻塞,於是處理器可以立即進行上下文切換以便處理其他就緒執行緒。如果我們只有處理器核心數那麼多個執行緒的話,即使有待執行的任務也無法排程處理了。

         因此,執行緒數與我們每個任務處於阻塞狀態的時間比例相關。加入任務有50%時間處於阻塞狀態,那程式所需執行緒數是處理器核心數的兩倍。我們可以計算出程式所需的執行緒數,公式如下:

執行緒數=CPU可用核心數/(1 - 阻塞係數),其中阻塞係數在在0到1範圍內。

計算密集型程式的阻塞係數為0,IO密集型程式的阻塞係數接近1。

         確定阻塞係數,我們可以先試著猜測,或者採用一些效能分析工具或java.lang.management API 來確定執行緒花在系統IO上的時間與CPU密集任務所耗的時間比值。

2. 確定任務的數量

  我們常常希望各個子任務的工作量是均勻分佈的,這樣每個執行緒的負載都差不多。但這通常會花大量的精力去做問題分解。事實證明,把任務儘可能拆解成細粒度,讓它遠比執行緒數多,讓處理器一直不停地工作,是最實惠的方法。

接下來看一看IO密集型應用程式使用併發的一個實用例子:

這是一個求使用者當前股票市值的例子,我們假設在獲取使用者持有股票資訊之後,需向雅虎請求獲得該股票的當前市值。

父類,定義了讀取輸入資料的方式與計時方法,計時方法中呼叫了solve方法進行處理,並獲取執行時間。solve方法由子類實現。
public abstract class AbstractSolver {
	public static Map<String,Integer> readTickers() throws IOException {
		//從檔案或資料庫讀取某使用者持有的股票ID與持股數 並以Map<股票ID,持股數>的形式返回
	}

	public void timeAndCompute() {
		final long start = System.nanoTime();
		final Map<String,Integer> stocks = readTrickers();
		final double result = solve(stocks);
		final long end = System.nanoTime();
		System.out.printf("Number of primes under %d is %d\n", number,
				numberOfPrimes);
		System.out.println("Time (seconds) taken is " + (end - start) / 1.0e9);
	}

	public abstract int solve(final Map<String, Integer> stocks) throws InterruptedException,ExcecutionException,IOException;
}

ConcurrentSolveIO:具體實現。
public class ConcurrentSolveIO extend AbstractSolver {
	public double solve(final Map<String,Integer> stocks)throws InterruptedException,ExcecutionException {
		final int numberOfCores = Runtime.getRuntime().availableProcessors(); //獲得核心數
		final double blockingCoefficient = 0.9;//阻塞係數
		final int poolSize = (int)(numberOfCores / (1 - blockingCoefficient)); //求得執行緒數大小

		System.out.println("Number of cores is " + numberOfCores);
		System.out.println("PoolSize is " + poolSize);
		final List<Callable<Double>> partitions = new ArrayList<Callable<Double>>(); //一系列的任務集合
		for(final String ticker : stocks.keySet()){
			partitions.add(new Callable<Double>(){
				public Double call() throws Exception{
					return stocks.get(triker) * YahooFinance.getPrice(tiker);//YahooFinance.getPrice(tiker)獲得股票市值
				}
			});
		}

		final ExecutorService executorPool = Executors.newFixedThreadPool(poolSize);//建立執行緒池 Java自帶的newFixedThreadPool可以滿足我們生成指定執行緒數的執行緒池的需要 如果任務數大於執行緒數,則所有任務將排隊等待被執行
		final List<Future<Double>> valuesOfStocks = ExecutorPool.invokeAll(partitions, 10000, TimeUnit.SECONDS);//設定任務
		
		double netAssetValue = 0.0;
		for(final Future<Double> valuesOfStocks : valueOfStocks){ //在任務全部執行完成之後,合計結果
			netAssetValue += valuesOfAStock.get();
		}

		executorPool.shutdown(); //關閉執行緒池
		return netAssetValue;
	}

	public static void main(final String[] aargs) throws ExecutionException, InterruptedException, IOException{
		new ConcurrentSolveIO.timeAndComputeValue(); //timeAndComputeValue方法繼承自AbstractIOSolver,其中呼叫了solve()方法並在前後加上計時,用於檢測效能優化的情況
	}
}

這個例子中,各個執行緒之間不需要訪問臨界區,因為在獲得結果後的邏輯較簡單,我們在所有任務停止之後一併處理結果。如果在收到外部介面的相應之後還要進行大量計算的話,則最好是在一有可用結果返回時就立即處理。這用JDK中自帶的CompletionService可以實現這一功能。