1. 程式人生 > >聊聊高併發(四十二)解析java.util.concurrent各個元件(十八) 任務的批量執行和CompletionService

聊聊高併發(四十二)解析java.util.concurrent各個元件(十八) 任務的批量執行和CompletionService

上一篇講了ExecutorService關於任務的非同步執行和狀態控制的部分,這篇說說關於任務批量執行的部分。ExecutorSerivce中關於批量執行的介面如下

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;

<T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;

 <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

invokeAll介面返回一個Future集合,包含了所有的任務的Future物件。invokeAny這個介面輸入一組任務,返回任意一個成功執行的任務的結果,其他的任務被取消。

AbstractExecutorService給出了invokeAll和invokeAny的預設實現。通過程式碼可以看到ExecutorService實現的任務批量執行的邏輯和一些問題。

invokeAll的實現如下:

1. 給每個任務建立RunnableTask結構,這個類是FutureTask的實現類,然後扔給執行緒池去執行execute。

2. 輪詢Future集合,如果任務沒有執行完成!f.done(),就呼叫FutureTask.get()方法,這個方法會阻塞等待直到任務完成或者丟擲異常

3. 當所有任務都執行完成後才返回結果

invokeAll的問題顯然易見,就是必須等待所有任務完成才返回,任務執行期間是無法獲得結果的,如果有些任務耗時很長,有些任務耗時很短,那麼先完成的任務也只能全部任務完成後才能返回。

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;
        try {
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (Future<T> f : futures) {
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;
            return futures;
        } finally {
            if (!done)
                for (Future<T> f : futures)
                    f.cancel(true);
        }
    }

正是ExecutorService在處理批量任務時必須等待全部任務都完成才能返回結果的問題,引入了CompletionService介面。CompletionService提供了一個完成佇列來解決這個問題。看一下CompletionService的介面定義。可以看到它的功能分為兩部分

1. submit提交任務,返回Future,進行非同步任務的狀態控制

2. take, poll 這兩個佇列操作,前者是阻塞佇列操作,後者可以快速返回,也可以限時操作

CompletionService的take, poll這兩個方法就是對它的完成佇列進行操作,完成的任務進入完成佇列,可以被直接獲取,不用等待其他任務的完成。

public interface CompletionService<V> {
    
    Future<V> submit(Callable<V> task);
    
    Future<V> submit(Runnable task, V result);
   
    Future<V> take() throws InterruptedException;

    Future<V> poll();

    Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

ExecutorCompletionService實現了CompletionService介面。它有3個屬性

1. private final Executor executor; 實際執行任務的Executor
2. private final AbstractExecutorService aes;  使用它的newTaskFor方法來適配Runnable和Callable任務,統一返回RunnableFuture結構
3. private final BlockingQueue<Future<V>> completionQueue;   存放執行完成的任務的完成佇列,是一個阻塞佇列

內部類QueueingFuture繼承了FutureTask,它的目的是重寫FutureTask的done方法,將完成的任務自動放入完成佇列completionQueue

private class QueueingFuture extends FutureTask<Void> {
        QueueingFuture(RunnableFuture<V> task) {
            super(task, null);
            this.task = task;
        }
        protected void done() { completionQueue.add(task); }
        private final Future<V> task;
    }

ExecutorCompletionService預設的建構函式裡使用了LinkedBlockingQueue來作阻塞佇列
  public ExecutorCompletionService(Executor executor) {
        if (executor == null)
            throw new NullPointerException();
        this.executor = executor;
        this.aes = (executor instanceof AbstractExecutorService) ?
            (AbstractExecutorService) executor : null;
        this.completionQueue = new LinkedBlockingQueue<Future<V>>();
    }

它的submit方法很簡單,就是把提交的任務封裝成QueueingFuture,然後交給Executor執行,
 public Future<V> submit(Callable<V> task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task);
        executor.execute(new QueueingFuture(f));
        return f;
    }

    public Future<V> submit(Runnable task, V result) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<V> f = newTaskFor(task, result);
        executor.execute(new QueueingFuture(f));
        return f;
    }

take和poll方法也很簡單,直接交給完成佇列completionQueue來執行阻塞佇列的操作
 public Future<V> take() throws InterruptedException {
        return completionQueue.take();
    }

    public Future<V> poll() {
        return completionQueue.poll();
    }

    public Future<V> poll(long timeout, TimeUnit unit)
            throws InterruptedException {
        return completionQueue.poll(timeout, unit);
    }

看了CompletionService的實現,來看一下如何使用它。AbstractExecutorService的doInvokeAny方法使用了CompletionService。

invokeAny方法是提交一組任務,然後有一個執行成功的任務就可以返回結果,然後取消其他任務。

1. List<Future<T>> futures= new ArrayList<Future<T>>(ntasks); 建立一個Future集合來存放任務的Future

2. ExecutorCompletionService<T> ecs = new ExecutorCompletionService<T>(this);  建立一個ComletionService

3. futures.add(ecs.submit(it.next()));  先提交一個任務

4. 在無限迴圈中,先看一下任務執行結果   Future<T> f = ecs.poll();

5. 如果f != null,表示已經有任務完成,然後呼叫f.get去取結果,如果能取到,就直接返回結果。如果丟擲異常,則繼續迴圈

6. 如果f == null,表示任務沒有完成,就再提交一個任務。如果是限時操作,就計算一下時間,判斷是否超時

public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
        try {
            return doInvokeAny(tasks, false, 0);
        } catch (TimeoutException cannotHappen) {
            assert false;
            return null;
        }
    }

 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            long lastTime = timed ? System.nanoTime() : 0;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            futures.add(ecs.submit(it.next()));
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        long now = System.nanoTime();
                        nanos -= now - lastTime;
                        lastTime = now;
                    }
                    else
                        f = ecs.take();
                }
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (Future<T> f : futures)
                f.cancel(true);
        }
    }