Java高併發程式設計——為IO密集型應用設計執行緒數與劃分任務
文章轉自:http://www.tuicool.com/articles/fA7rMn
實際工作中的三類程式適用於以併發的形式來提速:
1. 服務程式:同時響應多個使用者請求
2. 計算密集型程式:併發計算,將問題拆分為子任務、併發執行各子任務並最終將子任務的結果彙總合併。
3. IO密集型程式(阻塞型):常需要阻塞等待的程式,比如說因為網路環境阻塞等待,因為IO讀取阻塞等待。當一個任務阻塞在IO操作上時,我們可以立即切換執行其他任務或啟動其他IO操作請求,這樣併發就可以幫助我們有效地提升程式執行效率。
對於IO密集型程式,我們要用併發來獲得執行效率的大幅提升時,首先要思考兩個問題:如何估計需要建立多少個執行緒以及如何分解問題。這裡也涉及到如何估算併發帶來的效能提升的程度。
1. 確定執行緒數
確定執行緒數首先需要考慮到系統可用的處理器核心數:
Runtime.getRuntime().availableProcessors();
應用程式最小執行緒數應該等於可用的處理器核數。如果所有的任務都是計算密集型的,則建立處理器可用核心數這麼多個執行緒就可以了,這樣已經充分利用了處理器,也就是讓它以最大火力不停進行計算。建立更多的執行緒對於程式效能反而是不利的,因為多個執行緒間頻繁進行上下文切換對於程式效能損耗較大。
但如果任務都是IO密集型的,那我們就需要建立比處理器核心數大幾倍數量的執行緒。為何?當一個任務執行IO操作時,執行緒將被阻塞,於是處理器可以立即進行上下文切換以便處理其他就緒執行緒。如果我們只有處理器核心數那麼多個執行緒的話,即使有待執行的任務也無法排程處理了。
因此,執行緒數與我們每個任務處於阻塞狀態的時間比例相關。加入任務有50%時間處於阻塞狀態,那程式所需執行緒數是處理器核心數的兩倍。我們可以計算出程式所需的執行緒數,公式如下:
執行緒數=CPU可用核心數/(1 - 阻塞係數),其中阻塞係數在在0到1範圍內。
計算密集型程式的阻塞係數為0,IO密集型程式的阻塞係數接近1。
確定阻塞係數,我們可以先試著猜測,或者採用一些效能分析工具或java.lang.management API 來確定執行緒花在系統IO上的時間與CPU密集任務所耗的時間比值。
2. 確定任務的數量
我們常常希望各個子任務的工作量是均勻分佈的,這樣每個執行緒的負載都差不多。但這通常會花大量的精力去做問題分解。事實證明,把任務儘可能拆解成細粒度,讓它遠比執行緒數多,讓處理器一直不停地工作,是最實惠的方法。
接下來看一看IO密集型應用程式使用併發的一個實用例子:
這是一個求使用者當前股票市值的例子,我們假設在獲取使用者持有股票資訊之後,需向雅虎請求獲得該股票的當前市值。
父類,定義了讀取輸入資料的方式與計時方法,計時方法中呼叫了solve方法進行處理,並獲取執行時間。solve方法由子類實現。public abstract class AbstractSolver {
public static Map<String,Integer> readTickers() throws IOException {
//從檔案或資料庫讀取某使用者持有的股票ID與持股數 並以Map<股票ID,持股數>的形式返回
}
public void timeAndCompute() {
final long start = System.nanoTime();
final Map<String,Integer> stocks = readTrickers();
final double result = solve(stocks);
final long end = System.nanoTime();
System.out.printf("Number of primes under %d is %d\n", number,
numberOfPrimes);
System.out.println("Time (seconds) taken is " + (end - start) / 1.0e9);
}
public abstract int solve(final Map<String, Integer> stocks) throws InterruptedException,ExcecutionException,IOException;
}
ConcurrentSolveIO:具體實現。
public class ConcurrentSolveIO extend AbstractSolver {
public double solve(final Map<String,Integer> stocks)throws InterruptedException,ExcecutionException {
final int numberOfCores = Runtime.getRuntime().availableProcessors(); //獲得核心數
final double blockingCoefficient = 0.9;//阻塞係數
final int poolSize = (int)(numberOfCores / (1 - blockingCoefficient)); //求得執行緒數大小
System.out.println("Number of cores is " + numberOfCores);
System.out.println("PoolSize is " + poolSize);
final List<Callable<Double>> partitions = new ArrayList<Callable<Double>>(); //一系列的任務集合
for(final String ticker : stocks.keySet()){
partitions.add(new Callable<Double>(){
public Double call() throws Exception{
return stocks.get(triker) * YahooFinance.getPrice(tiker);//YahooFinance.getPrice(tiker)獲得股票市值
}
});
}
final ExecutorService executorPool = Executors.newFixedThreadPool(poolSize);//建立執行緒池 Java自帶的newFixedThreadPool可以滿足我們生成指定執行緒數的執行緒池的需要 如果任務數大於執行緒數,則所有任務將排隊等待被執行
final List<Future<Double>> valuesOfStocks = ExecutorPool.invokeAll(partitions, 10000, TimeUnit.SECONDS);//設定任務
double netAssetValue = 0.0;
for(final Future<Double> valuesOfStocks : valueOfStocks){ //在任務全部執行完成之後,合計結果
netAssetValue += valuesOfAStock.get();
}
executorPool.shutdown(); //關閉執行緒池
return netAssetValue;
}
public static void main(final String[] aargs) throws ExecutionException, InterruptedException, IOException{
new ConcurrentSolveIO.timeAndComputeValue(); //timeAndComputeValue方法繼承自AbstractIOSolver,其中呼叫了solve()方法並在前後加上計時,用於檢測效能優化的情況
}
}
這個例子中,各個執行緒之間不需要訪問臨界區,因為在獲得結果後的邏輯較簡單,我們在所有任務停止之後一併處理結果。如果在收到外部介面的相應之後還要進行大量計算的話,則最好是在一有可用結果返回時就立即處理。這用JDK中自帶的CompletionService可以實現這一功能。