執行緒池原理(七):執行緒池的使用
這篇文章通過介紹執行緒池的常見用法,總結前面學習到的內容。
主要包括
- ThreadPoolExecutor的使用
- ScheduledThreadPoolExecutor的使用
- ExecutorCompletionService的使用
1. 統計某個區間內數字和
這是一個常見的問題,比如計算1到1000000之間的數字和,通常情況下,一個執行緒去遍歷計算即可,既然學到了多執行緒,我們可以將該任務切割,分成多個子任務並交給執行緒池來解決,子任務解決後將這些任務的結果歸併,就可以得到我們要的值了。雖然這個問題很簡單,但是它的解決思路可以用於解決其他類似的問題。
先看下面程式碼:
public class AsyncEngine {
/**
* 執行緒池的執行緒數量不需要太多,它是和CPU核數量有關,太多反而上下文切換效率不好
*/
private static ExecutorService executorService =
new ThreadPoolExecutor(2, 10, 30, TimeUnit.SECONDS, new SynchronousQueue());
/**
* 定時任務執行緒池
*/
private static ScheduledExecutorService scheduledExecutorService
= new ScheduledThreadPoolExecutor(2);
/**
* 完成服務執行緒池
*/
private static CompletionService completionService
= new ExecutorCompletionService(executorService);
/**
* 非同步執行任務,所有任務執行完成後該方法才返回,如果任務執行失敗,該任務返回值為null
* @param tasks 待執行的任務
* @param <T> 任務的返回值
* @return
*/
public static <T> List<T> concurrentExecute(List<Callable<T>> tasks) {
if (tasks == null || tasks.size() == 0) {
return Lists.newArrayList();
}
List<T> res = Lists.newArrayList();
try {
List<Future<T>> futures = executorService.invokeAll(tasks);
for (Future<T> future : futures) {
T t = null;
try {
t = future.get();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (Throwable e) {
e.printStackTrace();
}
res.add(t);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return res;
}
/**
* 定時執行任務
* @param runnable 執行的任務
* @param initialDelay 初始延時
* @param delay 任務間的等待時間
* @param unit 時間單位
* @return
*/
public static ScheduledFuture<?> scheduledWithFixedDelay(Runnable runnable,
long initialDelay,
long delay, TimeUnit unit) {
return scheduledExecutorService.scheduleWithFixedDelay(runnable,
initialDelay,
delay, unit);
}
/**
* 完成服務
* @param tasks
* @param <T>
* @return
*/
public static <T> CompletionService completionExecute(List<Callable<T>> tasks) {
if (tasks == null || tasks.size() == 0) {
return completionService;
}
for (Callable<T> task : tasks) {
completionService.submit(task);
}
return completionService;
}
}
這段程式碼是在專案中實際使用的程式碼,只是作了部分改動,它是一個非同步執行引擎,方法concurrentExecute併發執行任務,當這些任務執行完成後,該方法才會返回。我們定義了一個執行緒池,指定核心執行緒數為2,最大執行緒數為10,執行緒數定義太大無意義,因為它是和CPU核數是有關係的。
concurrentExecute方法中,呼叫執行緒池的invokeAll方法,該方法返回Future的列表,然後遍歷該列表,Future的get方法會阻塞,直到該任務執行完成,將任務返回結果放入List中。注意,當任務丟擲異常時,返回結果為null,該null值也一併放到List中作為concurrentExecute方法的返回值。
有了這個非同步執行引擎,我們繼續看下怎麼實現統計某個區間內的數字和,看下程式碼:
public class Sum {
/**
* 多執行緒併發計算
* @param min
* @param max
* @return
*/
public static long sum1(int min, int max) {
if (min > max) {
return 0L;
}
List<Callable<Long>> tasks = Lists.newArrayList();
while (min < max) {
final int left = min;
final int right = max;
//分割任務,每個任務最多隻相加1000個數
Callable<Long> task = new Callable<Long>() {
@Override
public Long call() throws Exception {
long sum = 0;
int r = (left + 1000) < right ? (left + 1000) : right;
for (int i = left; i < r; i++) {
sum += i;
}
return sum;
}
};
tasks.add(task);
min += 1000;
}
//非同步執行,執行完成後該方法返回
List<Long> res = AsyncEngine.concurrentExecute(tasks);
long sum = 0;
//歸併結果
for (Long count : res) {
sum += count;
}
return sum;
}
/**
* 單執行緒計算
* @param min
* @param max
* @return
*/
public static long sum2(int min, int max) {
long sum = 0;
for (int i = min; i < max; i++) {
sum += i;
}
return sum;
}
public static void main(String[] args) {
System.out.println(sum1(4, 9999));
System.out.println(sum2(4, 9999));
}
}
併發統計和的重點在於分割任務,每個任務最多相加1000個數字。子任務分割完成後,一併交給非同步執行引擎處理,返回一個列表,該列表儲存了每個任務的返回值,最後歸併這個列表即可。這段程式碼也給出了單執行緒的程式碼,便於對比。
2. 定時任務
這裡的定時任務很簡單,5秒過後開始執行任務,任務執行完成等待1秒再次執行該任務,模擬鬧鐘的秒針執行,每秒嘀嗒一次。上面講到的非同步引擎AsyncEngine已經寫好了定時相關的程式碼,直接執行目標任務即可。
public class Alarm {
public static void main(String[] args) {
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("da");
}
};
//定時排程,模擬時鐘的秒針執行
AsyncEngine.scheduledWithFixedDelay(runnable, 5, 1, TimeUnit.SECONDS);
}
}
3. 完成服務
利用CompletionService來實現生產者消費者模式,對於第一個例子中講到的統計區間數字和,主執行緒其實就是一個消費者,執行緒池中的執行緒是生產者,這些生產者計算各個任務,消費者等待這些任務執行結束。
這裡面的問題是,通過concurrentExecute方法執行任務,需要等到所有任務執行結束該方法才能返回,如果某些任務已經完成,消費者能夠即時的消費,顯然更加合理。因此我們在AsyncEngine裡添加了completionExecute方法,這個方法提交所有的任務到執行緒池,返回CompletionService。消費者(主執行緒)通過CompletionService的take方法取任務Future,取到的Future肯定是已經結束的任務,當take方法阻塞,說明還沒有已經結束的任務。看下Sum類的實現:
public class Sum {
/**
* 多執行緒併發計算
* @param min
* @param max
* @return
*/
public static long sum1(int min, int max) {
if (min > max) {
return 0L;
}
List<Callable<Long>> tasks = Lists.newArrayList();
while (min < max) {
final int left = min;
final int right = max;
//分割任務,每個任務最多隻相加1000個數
Callable<Long> task = new Callable<Long>() {
@Override
public Long call() throws Exception {
long sum = 0;
int r = (left + 1000) < right ? (left + 1000) : right;
for (int i = left; i < r; i++) {
sum += i;
}
return sum;
}
};
tasks.add(task);
min += 1000;
}
//使用CompletionService執行任務
CompletionService<Long> completionService = AsyncEngine.completionExecute(tasks);
long sum = 0;
for ( int i = 0; i < tasks.size(); i++) {
try {
sum += completionService.take().get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
return sum;
}
}
這個類和前面那個Sum類非常類似,這裡只是將任務提交給了CompletionService執行,主執行緒通過CompletionService的take方法取任務的Future,將任務的返回值累加並返回。