1. 程式人生 > >執行緒池原理--執行器AbstractExecutorService

執行緒池原理--執行器AbstractExecutorService

文章目錄


執行緒池原理–總索引

執行緒池原理–執行器AbstractExecutorService

AbstractExecutorService

AbstractExecutorService是一個抽象類,實現自ExecutorService介面。該類實現了介面中的一些方法。

Sumit

可以看到,不管提交的是Runnable型別或者Callable型別的任務,最終都是建立FutureTask型別物件,並提交給execute()方法執行,execute()方法由子類實現。

 public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }
    public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task, result); execute(ftask); return ftask; } public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException
(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; } protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { return new FutureTask<T>(runnable, value); } protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { return new FutureTask<T>(callable); }

批量提交任務

invokeAll 用於批量提交任務,該方法會阻塞,來看一下其實現原理。
如果發生任何異常,所有的任務都會被取消。

 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        try {
             // 迴圈提交任務給execute()
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            //迴圈檢測所有任務是否執行完畢
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                //在這裡阻塞一直到f的任務執行完畢。
                if (!f.isDone()) {
                    try { f.get(); }
                    catch (CancellationException ignore) {}
                    catch (ExecutionException ignore) {}
                }
            }
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
    }

doInvokeAny也是用於批量提交任務,也會發生阻塞,但是隻要有一個任務執行完畢,那麼便會退出阻塞狀態並返回。
並且如果發生任何異常,所有的任務都會被取消。

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        //記錄當前未提交的任務數量
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<>(ntasks);
        ExecutorCompletionService<T> ecs =
            new ExecutorCompletionService<T>(this);

        // For efficiency, especially in executors with limited
        // parallelism, check to see if previously submitted tasks are
        // done before submitting more of them. This interleaving
        // plus the exception mechanics account for messiness of main
        // loop.

        try {
            // Record exceptions so that if we fail to obtain any
            // result, we can throw the last exception we got.
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();

            // Start one task for sure; the rest incrementally
            //提交第一個任務
            futures.add(ecs.submit(it.next()));
            //每提交第一個任務,ntasks-1
            --ntasks;
            int active = 1;

            for (;;) {
            //檢索並移除下一個要完成的任務
                Future<T> f = ecs.poll();
                //f == null ,說明還沒有任務要完成,繼續新增任務執行
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                        //超時判斷處理
                    else if (timed) {
                        f = ecs.poll(nanos, NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else
                        f = ecs.take();
                }
                //f!= null ,說明已經有任務完成,返回結果
                if (f != null) {
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)
                ee = new ExecutionException();
            throw ee;

        } finally {
            cancelAll(futures);
        }
    }