Java併發程式設計之執行緒池、Callable和Future使用
知識儲備
收藏幾篇好文章:
目錄結構
- Callable和Future使用
- 執行緒池使用
Callable和Future使用
多執行緒實現方式很多,分為兩類:1、沒有返回值的;2、有返回值的。
針對“沒有返回值的”這類可以參考《Java建立執行緒的兩種方法比較》,本文不作贅述。本文僅說明Callable方式的多執行緒實現。
首先,建立任務類實現Callable介面,重寫call函式,定義其返回型別。
/**
* Created by loongshawn on 2017/7/17.
*
* 併發任務
*/
public class TaskCallable implements Callable<DatabaseSearchResponse> {
private DataBaseAuthentication dataBaseAuthentication;
private String filepath;
private String extension;
private String checklinks;
public TaskCallable(DataBaseAuthentication dataBaseAuthentication,String filepath,String extension,String checklinks){
this .dataBaseAuthentication = dataBaseAuthentication;
this.filepath = filepath;
this.extension = extension;
this.checklinks = checklinks;
}
public DatabaseSearchResponse call() throws Exception {
// 任務查詢
DatabaseSearchRequest databaseSearchRequest = new DatabaseSearchRequest();
DatabaseSearchResponse databaseSearchResponse = databaseSearchRequest.execute(dataBaseAuthentication,filepath,extension,checklinks);
return databaseSearchResponse;
}
}
然後,構建多執行緒建立主體函式。僅提供程式碼片段。
public Response execute(SearchParameter para){
Response response = new Response();
ExecutorService executorService = Executors.newCachedThreadPool();
List<Future<DatabaseSearchResponse>> results = new ArrayList<Future<DatabaseSearchResponse>>();
// 獲取資料庫引數
...
for (String dataBaseName : dataBaseNames){
// 資料庫查詢任務
TaskCallable task = new TaskCallable(dataBaseAuthentication,filepath,extension,checklinks);
results.add(executorService.submit(task));
}
executorService.shutdown();
for (Future<DatabaseSearchResponse> searchResponseFuture: results){
JSONObject jsonObject = new JSONObject();
while (true) {
if(searchResponseFuture.isDone() && !searchResponseFuture.isCancelled()) {
DatabaseSearchResponse databaseSearchResponse = searchResponseFuture.get();
break;
} else {
Thread.sleep(100);
}
}
}
...
return response;
}
其中Future<?>用來接收執行緒執行結果,從Future字面意思理解,這是一個將來的結果,也就是說要想獲得執行緒執行結果,需要判斷其是否執行完畢。詳細可參考官方API文件。
執行緒池使用
在上例中,用到了執行緒池,採用了Executors提供的靜態方法初始化執行緒池。
ExecutorService executorService = Executors.newCachedThreadPool();
newCachedThreadPool()方法實現如下,即初始執行緒池沒有建立執行緒,只有在有新任務時才會建立執行緒去執行任務,空閒執行緒等待時間60秒。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ExecutorService與ThreadPoolExecutor是什麼關係,有什麼差異,因為通過ThreadPoolExecutor也能夠實現執行緒池。
public class Test {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));
for(int i=0;i<15;i++){
MyTask myTask = new MyTask(i);
executor.execute(myTask);
}
executor.shutdown();
}
}
首先來看看ThreadPoolExecutor的execute函式,這個函式返回void:
void execute(Runnable command)
//Executes the given task sometime in the future.
然後再來看看ExecutorService的submit函式,這個函式返回Future<?>,即有返回值,這是兩者的一個差異之處。
Future<?> submit(Runnable task)
//Submits a Runnable task for execution and returns a Future representing that task.
接下來,通過樹狀圖來看看執行緒池相關類間的關係,可以查閱原始碼看之間的關係:
- Executor是頂層介面,僅提供execute方法。
- ExecutorService介面繼承了Executor介面,豐富了介面函式。
- AbstractExecutorService抽象類實現了ExecutorService介面。
- ThreadPoolExecutor類繼承了AbstractExecutorService類。
關於corePoolSize、maximumPoolSize、blockingQueue引數的說明:
部分原始碼
Executor介面
public interface Executor {
void execute(Runnable command);
}
ExecutorService介面
public interface ExecutorService extends Executor {
void shutdown();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService抽象類
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
public Future<?> submit(Runnable task) {};
public <T> Future<T> submit(Runnable task, T result) { };
public <T> Future<T> submit(Callable<T> task) { };
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
};
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
};
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
};
}
ThreadPoolExecutor類
public class ThreadPoolExecutor extends AbstractExecutorService {
.....
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
}
原始碼解析
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
片段1: command為空,返回NullPointerException。
if (command == null)
throw new NullPointerException();
片段2: 如果當前執行緒數量小於corePoolSize,則線上程池中新啟一個任務執行緒。
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
片段3: 如果當前執行緒池正常執行,且任務成功新增到快取佇列中。則會進行一次double check:1、執行緒池shut down,移除佇列任務,拒絕新任務;2、執行緒死掉了。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
片段4: 任務新增到快取佇列失敗,拒絕新任務,可能是執行緒池shut down或者執行緒池已達最大容量。
else if (!addWorker(command, false))
reject(command);