1. 程式人生 > >Java併發程式設計之執行緒池、Callable和Future使用

Java併發程式設計之執行緒池、Callable和Future使用

知識儲備

收藏幾篇好文章:

目錄結構

  • Callable和Future使用
  • 執行緒池使用

Callable和Future使用

多執行緒實現方式很多,分為兩類:1、沒有返回值的;2、有返回值的。

針對“沒有返回值的”這類可以參考《Java建立執行緒的兩種方法比較》,本文不作贅述。本文僅說明Callable方式的多執行緒實現。

首先,建立任務類實現Callable介面,重寫call函式,定義其返回型別。

/**
 * Created by loongshawn on 2017/7/17.
 *
 * 併發任務
 */
public class TaskCallable implements
Callable<DatabaseSearchResponse> {
private DataBaseAuthentication dataBaseAuthentication; private String filepath; private String extension; private String checklinks; public TaskCallable(DataBaseAuthentication dataBaseAuthentication,String filepath,String extension,String checklinks){ this
.dataBaseAuthentication = dataBaseAuthentication; this.filepath = filepath; this.extension = extension; this.checklinks = checklinks; } public DatabaseSearchResponse call() throws Exception { // 任務查詢 DatabaseSearchRequest databaseSearchRequest = new DatabaseSearchRequest(); DatabaseSearchResponse databaseSearchResponse = databaseSearchRequest.execute(dataBaseAuthentication,filepath,extension,checklinks); return
databaseSearchResponse; } }

然後,構建多執行緒建立主體函式。僅提供程式碼片段。


public Response execute(SearchParameter para){

        Response response = new Response();
        ExecutorService executorService = Executors.newCachedThreadPool();
        List<Future<DatabaseSearchResponse>> results = new ArrayList<Future<DatabaseSearchResponse>>();
        // 獲取資料庫引數
        ...
        for (String dataBaseName : dataBaseNames){

            // 資料庫查詢任務
            TaskCallable task = new TaskCallable(dataBaseAuthentication,filepath,extension,checklinks);
            results.add(executorService.submit(task));
        }

        executorService.shutdown();

        for (Future<DatabaseSearchResponse> searchResponseFuture: results){
            JSONObject jsonObject = new JSONObject();
            while (true) {
                if(searchResponseFuture.isDone() && !searchResponseFuture.isCancelled()) {
                    DatabaseSearchResponse databaseSearchResponse = searchResponseFuture.get();
                    break;
                } else {
                     Thread.sleep(100); 
                }
            }
        }
        ...
        return response;
    }

其中Future<?>用來接收執行緒執行結果,從Future字面意思理解,這是一個將來的結果,也就是說要想獲得執行緒執行結果,需要判斷其是否執行完畢。詳細可參考官方API文件

這裡寫圖片描述

這裡寫圖片描述

執行緒池使用

這裡寫圖片描述

在上例中,用到了執行緒池,採用了Executors提供的靜態方法初始化執行緒池。

ExecutorService executorService = Executors.newCachedThreadPool();

newCachedThreadPool()方法實現如下,即初始執行緒池沒有建立執行緒,只有在有新任務時才會建立執行緒去執行任務,空閒執行緒等待時間60秒。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

ExecutorService與ThreadPoolExecutor是什麼關係,有什麼差異,因為通過ThreadPoolExecutor也能夠實現執行緒池。

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));

         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
         }
         executor.shutdown();
     }
}

首先來看看ThreadPoolExecutor的execute函式,這個函式返回void:

void execute(Runnable command)
//Executes the given task sometime in the future.

然後再來看看ExecutorService的submit函式,這個函式返回Future<?>,即有返回值,這是兩者的一個差異之處。


Future<?>   submit(Runnable task)
//Submits a Runnable task for execution and returns a Future representing that task.

接下來,通過樹狀圖來看看執行緒池相關類間的關係,可以查閱原始碼看之間的關係:

這裡寫圖片描述

  • Executor是頂層介面,僅提供execute方法。
  • ExecutorService介面繼承了Executor介面,豐富了介面函式。
  • AbstractExecutorService抽象類實現了ExecutorService介面。
  • ThreadPoolExecutor類繼承了AbstractExecutorService類。

關於corePoolSize、maximumPoolSize、blockingQueue引數的說明:

這裡寫圖片描述

部分原始碼

Executor介面

public interface Executor {
    void execute(Runnable command);
}

ExecutorService介面

public interface ExecutorService extends Executor {

    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

AbstractExecutorService抽象類

public abstract class AbstractExecutorService implements ExecutorService {

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

ThreadPoolExecutor類

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }
}

原始碼解析

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
}

片段1: command為空,返回NullPointerException。

if (command == null)
            throw new NullPointerException();

片段2: 如果當前執行緒數量小於corePoolSize,則線上程池中新啟一個任務執行緒。

int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

片段3: 如果當前執行緒池正常執行,且任務成功新增到快取佇列中。則會進行一次double check:1、執行緒池shut down,移除佇列任務,拒絕新任務;2、執行緒死掉了。

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

片段4: 任務新增到快取佇列失敗,拒絕新任務,可能是執行緒池shut down或者執行緒池已達最大容量。

else if (!addWorker(command, false))
            reject(command);