Callable+ThreadPoolExecutor實現多執行緒併發並獲得返回值
前言
經常會遇到一些效能問題,比如呼叫某個介面,可能要迴圈呼叫100次,並且需要拿到每一次呼叫的返回結果,通常我們都是放在for迴圈中一次次的序列呼叫,這種方式可想而知道有多慢,那怎麼解決這個問題呢?
多執行緒
為了解決以上問題,我使用的方式是多執行緒。多執行緒常規的有兩種實現方式,即繼承Tread類,實現Runnable介面,但是這兩種實現方式,有一個共同的問題,就是沒有返回值,對於我們來說,獲得每個執行緒的返回值,是個很困難的問題,因此不能用Tread類或Runnable介面,我用的是Callable和ThreadPoolExecutor,Callable的process方法可以允許有返回值,ThreadPoolExecutor的invokeAll或submit方法可以拿到執行緒的執行結果
案例
假設需要給100個使用者傳送郵件,並需要每個使用者的返回結果,先看下程式碼結構
CallableTemplate.java
package com.mairuan.base.concurrent;
import java.util.concurrent.Callable;
/**
* 多執行緒模板類
*
* @author Administrator
*
* @param <V>
*/
public abstract class CallableTemplate<V> implements Callable<V> {
/**
* 前置處理,子類可以Override該方法
*/
public void beforeProcess() {
System.out.println("before process....");
}
/**
* 處理業務邏輯的方法,需要子類去Override
* @return
*/
public abstract V process();
/**
* 後置處理,子類可以Override該方法
*/
public void afterProcess() {
System.out.println("after process...." );
}
@Override
public V call() throws Exception {
beforeProcess();
V result = process();
afterProcess();
return result;
}
}
CallableTemplate類實現了Callable介面,並實現了process方法,該類是一個抽象類,接收任意返回值的型別,beforeProcess方法為前置處理,afterProcess的後置處理,process為具體的業務邏輯抽象方法,該方法在子類中實現
IConcurrentThreadPool.java
package com.mairuan.base.concurrent;
import java.util.List;
import java.util.concurrent.ExecutionException;
public interface IConcurrentThreadPool {
/**
* 初始化執行緒池
*/
void initConcurrentThreadPool();
/**
* 提交單個任務
*
* @param task
* @return
*/
<V> V submit(CallableTemplate<V> task) throws InterruptedException,
ExecutionException;
/**
* 提交多個任務
*
* @param tasks
* @return
*/
<V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
throws InterruptedException, ExecutionException;
}
IConcurrentThreadPool是多執行緒介面類,聲名了三個方法,initConcurrentThreadPool:初始化執行緒池,submit:提交單個任務的執行緒,並有返回值,invokeAll:提交多個任務的執行緒,並有返回值
ConcurrentThreadPool.java
package com.mairuan.base.concurrent.impl;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.IConcurrentThreadPool;
public class ConcurrentThreadPool implements IConcurrentThreadPool {
private ThreadPoolExecutor threadPoolExecutor;
// 核心執行緒數
private int corePoolSize = 10;
// 最大執行緒數
private int maximumPoolSize = 20;
// 超時時間30秒
private long keepAliveTime = 30;
@Override
public void initConcurrentThreadPool() {
threadPoolExecutor = new ThreadPoolExecutor(corePoolSize,
maximumPoolSize, keepAliveTime, TimeUnit.SECONDS,
new LinkedBlockingDeque<Runnable>());
}
@Override
public <V> V submit(CallableTemplate<V> task) throws InterruptedException,
ExecutionException {
Future<V> result = threadPoolExecutor.submit(task);
return result.get();
}
@Override
public <V> List<V> invokeAll(List<? extends CallableTemplate<V>> tasks)
throws InterruptedException, ExecutionException {
List<Future<V>> tasksResult = threadPoolExecutor.invokeAll(tasks);
List<V> resultList = new ArrayList<V>();
for (Future<V> future : tasksResult) {
resultList.add(future.get());
}
return resultList;
}
}
ConcurrentThreadPool是建立執行緒池的實現類,用到了ThreadPoolExecutor執行緒池類及這個類的invokeAll方法和submit方法,這兩個方法的返回值,都可以通過Future類的get方法獲得
ICallableTaskFrameWork.java
public interface ICallableTaskFrameWork {
<V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
throws InterruptedException, ExecutionException;
}
ICallableTaskFrameWork是定義的執行緒任務框架介面,所有的多執行緒呼叫,都通過該介面發起
CallableTaskFrameWork.java
package com.mairuan.base.concurrent.impl;
import java.util.List;
import java.util.concurrent.ExecutionException;
import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.ICallableTaskFrameWork;
import com.mairuan.base.concurrent.IConcurrentThreadPool;
public class CallableTaskFrameWork implements ICallableTaskFrameWork {
private IConcurrentThreadPool concurrentThreadPool = new ConcurrentThreadPool();
@Override
public <V> List<V> submitsAll(List<? extends CallableTemplate<V>> tasks)
throws InterruptedException, ExecutionException {
concurrentThreadPool.initConcurrentThreadPool();
return concurrentThreadPool.invokeAll(tasks);
}
}
CallableTaskFrameWork是ICallableTaskFrameWork 的實現類,在submitsAll實現方法中,通過呼叫執行緒池物件IConcurrentThreadPool介面的invokeAll方法來發起多執行緒的呼叫,這裡注意一個,在submitAll實現方法中,我手動的呼叫了初始化執行緒池的方法concurrentThreadPool.initConcurrentThreadPool(),在真實的專案上,應該在應用啟動的時候就呼叫該方法來初始化執行緒池
測試類程式碼
SendMessageService.java,假設這是一個傳送郵件資訊的服務類
package com.mairuan.base.concurrent.test;
public class SendMessageService {
public void sendMessage(String email,String content){
System.out.println("傳送郵件。。。");
}
}
SendMessageHander.java,多執行緒傳送郵件的處理類
package com.mairuan.base.concurrent.test;
import java.util.HashMap;
import java.util.Map;
import com.mairuan.base.concurrent.CallableTemplate;
public class SendMessageHander extends CallableTemplate<Map<String, String>> {
private String email;
private String content;
public SendMessageHander(String email,String content) {
this.email = email;
this.content = content;
}
@Override
public Map<String, String> process() {
SendMessageService sendMessageService = new SendMessageService();
sendMessageService.sendMessage(email, content);
Map<String, String> map = new HashMap<String, String>();
map.put(email, content);
return map;
}
}
這個類繼承了上面的CallableTemplate,我們要的返回值是Map,因此泛型型別是Map,在類中還重寫了process方法,在方法中呼叫傳送郵件的業務邏輯介面SendMessageService.sendMessage,並將返回結果組裝成Map返回,這裡我就簡單處理了,將郵件地址及內容放在Map中直接返回了;另外還要注意這個類有個有參構造器,通過構建器可以接收需要傳遞進來的引數
SendMessageTest.java,測試類
package com.mairuan.base.concurrent.test;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.ExecutionException;
import com.mairuan.base.concurrent.CallableTemplate;
import com.mairuan.base.concurrent.ICallableTaskFrameWork;
import com.mairuan.base.concurrent.impl.CallableTaskFrameWork;
public class SendMessageTest {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ICallableTaskFrameWork callableTaskFrameWork = new CallableTaskFrameWork();
List<CallableTemplate<Map<String, String>>> tasks = new ArrayList<CallableTemplate<Map<String, String>>>();
SendMessageHander sendMessageHander = null;
// 將需要傳送郵件的郵件地址及內容組裝好,放在一個集合中
for (int i = 0; i < 1000; i++) {
sendMessageHander = new SendMessageHander("email" + i, "content"
+ i);
tasks.add(sendMessageHander);
}
//通過多執行緒一次性發起郵件,並拿到返回結果集
List<Map<String, String>> results = callableTaskFrameWork
.submitsAll(tasks);
// 解析返回結果集
for (Map<String, String> map : results) {
for (Entry<String, String> entry : map.entrySet()) {
System.out.println(entry.getKey() + "\t" + entry.getValue());
}
}
}
}
執行結果