1. 程式人生 > >Java 並發編程——Executor框架和線程池原理

Java 並發編程——Executor框架和線程池原理

ignore 程序管理 on() explicit 開發 抽象類 bool i++ RR

Eexecutor作為靈活且強大的異步執行框架,其支持多種不同類型的任務執行策略,提供了一種標準的方法將任務的提交過程和執行過程解耦開發,基於生產者-消費者模式,其提交任務的線程相當於生產者,執行任務的線程相當於消費者,並用Runnable來表示任務,Executor的實現還提供了對生命周期的支持,以及統計信息收集,應用程序管理機制和性能監視等機制。

下面這段代碼中將多個任務放到了線程池中執行:

static class MyRunnable implements Runnable{
    @Override
    public void run() {
        for(int i=0;i<100;i++)
            System.out.println(Thread.currentThread().getName()+":  
"+i); } } static class MyThread extends Thread { public MyThread(String in){ super(in); } @Override public void run(){ System.out.println(Thread.currentThread().getName()); } } public static void main(String[] args) { ExecutorService pool3 = Executors.newFixedThreadPool(2); pool3.execute(new
MyThread("t1")); pool3.execute(new MyThread("t2")); pool3.execute(new MyThread("t3")); pool3.execute(new MyThread("t4")); pool3.shutdown(); ExecutorService pool2 = Executors.newFixedThreadPool(2); pool2.submit(new MyRunnable()); pool2.submit(new MyRunnable()); pool2.shutdownNow(); }

技術分享圖片

Executor:一個接口,其定義了一個接收Runnable對象的方法executor,其方法簽名為executor(Runnable command),

ExecutorService:是一個比Executor使用更廣泛的子類接口,其提供了生命周期管理的方法,以及可跟蹤一個或多個異步任務執行狀況返回Future的方法

AbstractExecutorService:ExecutorService執行方法的默認實現

ScheduledExecutorService:一個可定時調度任務的接口

ScheduledThreadPoolExecutor:ScheduledExecutorService的實現,一個可定時調度任務的線程池

ThreadPoolExecutor:線程池,可以通過調用Executors以下靜態工廠方法來創建線程池並返回一個ExecutorService對象:

1. Executor 接口

Executor是java.util.concurrent 包下的一個接口。
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

該接口十分簡單,只有一個執行的方法 execute() 方法。

2. ExecutorService接口

為了充分理解ExecutorService接口建議先了解:Java 並發編程——Callable+Future+FutureTask

public interface ExecutorService extends Executor;

void shutdown();

啟動一次順序關閉,執行以前提交的任務,但不接受新任務。


List<Runnable> shutdownNow()

試圖停止所有正在執行的活動任務,暫停處理正在等待的任務,並返回等待執行的任務列表。


boolean isShutdown()

*
* @return {@code true} if this executor has been shut down
*/


boolean isTerminated()
@return {@code true} if all tasks have completed following shut down
boolean isTerminated()

只有當shutdown()或者shutdownnow()被調用,而且所有任務都執行完成後才會返回true。


boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
Blocks until all tasks have completed execution after a shutdown
* request, or the timeout occurs, or the current thread is
* interrupted, whichever happens first.


<T> Future<T> submit(Callable<T> task)
<T> Future<T> submit(Runnable task, T result)
Future<?Future> submit(Runnable task)
都是提交一個任務等待執行,只不過第二個函數Future.get()返回值為result。第三個函數Future.get()返回值為null

<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
相當於submit()函數,不同之處是它可以同時提交多個任務,並將Future列表返回,使用者可以遍歷List中的元素進行.get操作。


<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;

同時提交多個任務,返回第一個執行完成的結果。


ExecutorService中execute()函數和submit()函數的區別(開頭線程池使用的代碼中使用了這兩種方法執行一個任務)

1. 接收的參數不一樣,execute接口只能接收Runnable向,而submit接口可以接收多種類型的對象

2.返回值不同,execute沒有返回值,而submit返回一個Future對象

3.Exception處理

There is a difference when looking at exception handling. If your tasks throws an exception and if it was submitted with execute this exception will go to the uncaught exception handler (when you don‘t have provided one explicitly, the default one will just print the stack trace to System.err). If you submitted the task with submit any thrown exception, checked or not, is then part of the task‘s return status. For a task that was submitted with submit and that terminates with an exception, the Future.get will rethrow this exception, wrapped in an ExecutionException.


3. AbstractExecutorService

通過這個類的名字可能就大概知道了這個類的作用,它實現了ExecutorService中的部分方法,對於繼承它的類可以減少實現的代碼。該抽象類中並沒有存放任務或者線程的數組或者Collection所以,對線程隊列的具體管理AbstractExecutorService類並不涉及。

下面看一下裏面幾個關鍵接口的實現:

當向線程池中提交一個任務時,它實際上還是調用execute接口去執行的,然後將執行的結果(如果有的話)一個Future對象返回給任務提交方。

public Future<?> submit(Runnable task) {
        if (task == null) throw new NullPointerException();
        RunnableFuture<Void> ftask = newTaskFor(task, null);
        execute(ftask);
        return ftask;
    }

execute()的實現並沒有放在當前的抽象類中實現,而是讓子類去實現。

再看一下invokeAll的執行過程:

  public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<>(tasks.size());
        try {
           // 將tasks轉換成Futrues類型
            for (Callable<T> t : tasks) {
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
  // 執行future中的get函數
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);
                if (!f.isDone()) {
                    try { f.get(); }
                    catch (CancellationException ignore) {}
                    catch (ExecutionException ignore) {}
                }
            }
            return futures;
        } catch (Throwable t) {
            cancelAll(futures);
            throw t;
        }
    }

其它函數的實現原理基本相同,參考源碼。

4. ThreadPoolExecutor

public class ThreadPoolExecutor extends AbstractExecutorService

為什麽需要線程池

使用線程的時候就去創建一個線程,這樣實現起來非常簡便,但是就會有一個問題:如果並發的線程數量很多,並且每個線程都是執行一個時間很短的任務就結束了,這樣頻繁創建線程就會大大降低系統的效率,因為頻繁創建線程和銷毀線程需要時間。

構造方法

ThreadPoolExecutor是線程池的真正實現,他通過構造方法的一系列參數,來構成不同配置的線程池。常用的構造方法有下面四個:

技術分享圖片

ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue)
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory)
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        RejectedExecutionHandler handler)
ThreadPoolExecutor(int corePoolSize,
                        int maximumPoolSize,
                        long keepAliveTime,
                        TimeUnit unit,
                        BlockingQueue<Runnable> workQueue,
                        ThreadFactory threadFactory,
                        RejectedExecutionHandler handler)

構造方法參數說明

  • corePoolSize

    核心線程數,默認情況下核心線程會一直存活,即使處於閑置狀態也不會受存keepAliveTime限制。除非將allowCoreThreadTimeOut設置為true

  • maximumPoolSize

    線程池所能容納的最大線程數。超過這個數的線程將被阻塞。當任務隊列為沒有設置大小的LinkedBlockingDeque時,這個值無效。

  • keepAliveTime

    非核心線程的閑置超時時間,超過這個時間就會被回收。(當線程數沒有超過核心線程數時,這個時間沒有任何意義)

  • unit

    指定keepAliveTime的單位,如TimeUnit.SECONDS。當將allowCoreThreadTimeOut設置為true時對corePoolSize生效。

  • workQueue

    線程池中的任務隊列.

    常用的有三種隊列,SynchronousQueue,LinkedBlockingDeque,ArrayBlockingQueue

  • threadFactory

    線程工廠,提供創建新線程的功能。ThreadFactory是一個接口,只有一個方法。

public interface RejectedExecutionHandler {
  void rejectedExecution(Runnable var1, ThreadPoolExecutor var2);
}

當線程池中的資源已經全部使用,添加新線程被拒絕時,會調用RejectedExecutionHandler的rejectedExecution方法。

下圖為線程池的主要結構:

技術分享圖片

一定要註意一個概念,即存在於線程池中容器的一定是Thread對象,而不是你要求運行的任務(所以叫線程池而不叫任務池也不叫對象池);你要求運行的任務將被線程池分配給某一個空閑的Thread運行。

下面這例子很好的說明了ThreadpoolExecutor的用法:

public class TestThreadPoolExecutor {

   static class MyTask implements Runnable {
        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            System.out.println("正在執行task "+taskNum);
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"執行完畢");
        }
    }

    public static void main(String[] args) {
        // 核心線程數為 5
        // 最大線程數為 10(最多同時運行10個線程)
        // 非核心線程沒有任務執行時,最多等待200ms
        // 等待隊列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,200,
                TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));
        for(int i=0;i<15;i++){
            executor.submit(new MyTask(i));
            System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
                    executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
        }
    }
}

執行結果如下:

技術分享圖片
public class TestThreadPoolExecutor {

   static class MyTask implements Runnable {
        private int taskNum;

        public MyTask(int num) {
            this.taskNum = num;
        }

        @Override
        public void run() {
            System.out.println("正在執行task "+taskNum);
            try {
                Thread.currentThread().sleep(4000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("task "+taskNum+"執行完畢");
        }
    }

    public static void main(String[] args) {
        // 核心線程數為 5
        // 最大線程數為 10(最多同時運行10個線程)
        // 非核心線程沒有任務執行時,最多等待200ms
        // 等待隊列
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5,10,200,
                TimeUnit.MILLISECONDS,new ArrayBlockingQueue<Runnable>(5));
        for(int i=0;i<15;i++){
            executor.submit(new MyTask(i));
            System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
                    executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
        }
    }
}
View Code

這個例子很好的說明了線程池的執行邏輯:當核心線程池為滿時,開啟線程執行提交的任務,當核心線程池滿時,將任務放到等待隊列中等待執行。當隊列已滿時,開啟線程去執行任務(沒有達到最大線程數10),當開啟線程執行完成後執行等待隊列中的線程。

線程池中 核心線程、線程隊列、最大線程的運行準則:

1、首先可以通過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務後,會有幾種處理情況:
1.1、如果當前線程池中運行的線程數量還沒有達到corePoolSize大小時,線程池會創建一個新的線程運行你的任務,無論之前已經創建的線程是否處於空閑狀態。
1.2、如果當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把你的這個任務加入到等待隊列中。直到某一個的線程空閑了,線程池會根據設置的等待隊列規則,從隊列中取出一個新的任務執行。
1.3、如果根據隊列規則,這個任務無法加入等待隊列。這時線程池就會創建一個“非核心線程”直接運行這個任務。註意,如果這種情況下任務執行成功,那麽當前線程池中的線程數量一定大於corePoolSize。
1.4、如果這個任務,無法被“核心線程”直接執行,又無法加入等待隊列,又無法創建“非核心線程”直接執行,且你沒有為線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操作,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy)
2、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(所有的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,並停留在哪裏。
3、當線程池中的線程超過你設置的corePoolSize參數,說明當前線程池中有所謂的“非核心線程”。那麽當某個線程處理完任務後,如果等待keepAliveTime時間後仍然沒有新的任務分配給它,那麽這個線程將會被回收。線程池回收線程時,對所謂的“核心線程”和“非核心線程”是一視同仁的,直到線程池中線程的數量等於你設置的corePoolSize參數時,回收過程才會停止。

這裏關於線程池的具體實現先留個坑,到時候再填。

5. 線程池管理工具 Executors

Executors是線程池創建和使用的工具類,它的所有方法都是static的。

  • 生成一個固定大小的線程池:
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

最大線程數設置為與核心線程數相等,此時 keepAliveTime 設置為 0(因為這裏它是沒用的,即使不為 0,線程池默認也不會回收 corePoolSize 內的線程),任務隊列采用 LinkedBlockingQueue,無界隊列。

過程分析:剛開始,每提交一個任務都創建一個 worker,當 worker 的數量達到 nThreads 後,不再創建新的線程,而是把任務提交到 LinkedBlockingQueue 中,而且之後線程數始終為 nThreads。

  • 生成只有一個線程的固定線程池,這個更簡單,和上面的一樣,只要設置線程數為 1 就可以了:
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
  • 生成一個需要的時候就創建新的線程,同時可以復用之前創建的線程(如果這個線程當前沒有任務)的線程池:
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

核心線程數為 0,最大線程數為 Integer.MAX_VALUE,keepAliveTime 為 60 秒,任務隊列采用 SynchronousQueue。

這種線程池對於任務可以比較快速地完成的情況有比較好的性能。如果線程空閑了 60 秒都沒有任務,那麽將關閉此線程並從線程池中移除。所以如果線程池空閑了很長時間也不會有問題,因為隨著所有的線程都會被關閉,整個線程池不會占用任何的系統資源。

過程分析:我把 execute 方法的主體黏貼過來,讓大家看得明白些。鑒於 corePoolSize 是 0,那麽提交任務的時候,直接將任務提交到隊列中,由於采用了 SynchronousQueue,所以如果是第一個任務提交的時候,offer 方法肯定會返回 false,因為此時沒有任何 worker 對這個任務進行接收,那麽將進入到最後一個分支來創建第一個 worker。之後再提交任務的話,取決於是否有空閑下來的線程對任務進行接收,如果有,會進入到第二個 if 語句塊中,否則就是和第一個任務一樣,進到最後的 else if 分支。


參考:

https://www.cnblogs.com/MOBIN/p/5436482.html

ExectorService 中invokeeAll 接口說明: https://blog.csdn.net/baidu_23086307/article/details/51740852

線程池使用: https://blog.csdn.net/qq_25806863/article/details/71126867

https://javadoop.com/2017/09/05/java-thread-pool/#toc6

https://blog.csdn.net/lipc_/article/details/52025993

Java 並發編程——Executor框架和線程池原理