1. 程式人生 > >多線程---線程池

多線程---線程池

進行 all priority pool ray point steps 無法 arm


參考博客1:http://www.cnblogs.com/exe19/p/5359885.html
參考博客2:http://www.jianshu.com/p/87bff5cc8d8c

1 . 線程池的體系結構

java.util.concurrent.Executor             [I]是一個頂層接口,在它裏面只聲明了一個方法execute(Runnable),返回值為void,參數為Runnable類型,
    |-- ExecutorService                    [I]繼承了Executor,線程池的主要接口
        |-- AbstractExecutorService        [A]抽象類,實現了ExecutorService中的大部分方法
            
|-- ThreadPoolExecutor [C]線程池的主要實現類 |-- ScheduledExecutorService [I]繼承了ExecutorService,負責線程調度的接口 |-- ScheduledExecutorService [C]繼承 ThreadPoolExecutor,實現 ScheduledExecutorService

2 .線程池的使用

public static void method1() {
        Executor pool = Executors.newFixedThreadPool(5);
        
for(int i=0;i<5;i++){ pool.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName()); } }); } } 1、Executors.newFixedThreadPool(5)初始化一個包含5個線程的線程池pool;
2、通過pool.execute方法提交1個任務,該任務打印當前的線程名; 3、負責執行任務的線程的生命周期都由Executor框架進行管理;

3 . ThreadPoolExecutor類

  • 構造方法
    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

    • corePoolSize:線程池中的核心線程數。在創建了線程池後,默認情況下,線程池中的線程數為0即線程池中並沒有任何線程,而是等待有任務到來才創建線程去執行任務 ( 除非調用了prestartAllCoreThreads()或者prestartCoreThread()這2個預創建線程的方法,這兩個方法會在沒有任務到來之前就創建corePoolSize個線程或者一個線程 )當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到阻塞緩存隊列當中。
    • maximumPoolSize:線程池中允許的最大線程數。如果當前阻塞隊列滿了,且繼續提交任務,則創建新的線程執行任務,前提是當前線程數小於maximumPoolSize;
    • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認情況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime才會起作用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,如果一個線程空閑的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。但是如果調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起作用,直到線程池中的線程數為0;
    • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
      TimeUnit.DAYS;              //天
      TimeUnit.HOURS;             //小時
      TimeUnit.MINUTES;           //分鐘
      TimeUnit.SECONDS;           //秒
      TimeUnit.MILLISECONDS;      //毫秒
      TimeUnit.MICROSECONDS;      //微妙
      TimeUnit.NANOSECONDS;       //納秒
    • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,一般來說,這裏的阻塞隊列有以下幾種選擇:
      1、ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
      2、LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量通常要高於ArrayBlockingQuene;
      3、SynchronousQuene:一個不存儲元素的阻塞隊列,每個插入操作必須等到另一個線程調用移除操作,否則插入操作一直處於阻塞狀態,吞吐量通常要高於LinkedBlockingQuene;
      4、priorityBlockingQuene:具有優先級的無界阻塞隊列;
      其中ArrayBlockingQueue和PriorityBlockingQueue使用較少,一般使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
    • threadFactory:線程工廠,主要用來創建線程;
    • handler:線程池的飽和策略,當阻塞隊列滿了,且沒有空閑的工作線程,如果繼續提交任務,必須采取一種策略處理該任務,線程池提供了4種策略:
      1、AbortPolicy:丟棄任務並拋出RejectedExecutionException異常,默認策略;
      2、DiscardPolicy:也是丟棄任務,但是不拋出異常。
      3、DiscardOldestPolicy:丟棄隊列最前面的任務,然後重新嘗試執行任務(重復此過程)
      4、CallerRunsPolicy:用調用者所在的線程來執行任務;
      當然也可以根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。
    • execute()方法

      public void execute(Runnable command) {
            if (command == null)
                throw new NullPointerException();
            /*
             * Proceed in 3 steps:
             * 此過程分為3步:
             * 1. If fewer than corePoolSize threads are running, try to
             * start a new thread with the given command as its first
             * task.  The call to addWorker atomically checks runState and
             * workerCount, and so prevents false alarms that would add
             * threads when it shouldn‘t, by returning false.
             * 1.如果正在運行的線程數小於corePoolSize,則嘗試啟動一個新
             * 的線程來執行指定的任務。對addWorker 方法的調用會原子性
             * 的檢查runState (運行狀態)和workerCount(工作集數量 ),
             * 通過返回false阻止不應該出現的添加錯誤。
             *
             * 2. If a task can be successfully queued, then we still need
             * to double-check whether we should have added a thread
             * (because existing ones died since last checking) or that
             * the pool shut down since entry into this method. So we
             * recheck state and if necessary roll back the enqueuing if
             * stopped, or start a new thread if there are none.
             * 2. 如果一個線程可以成功加入隊列,無論我們已經將線
             * 程添加進去了還是在進入這個方法時線程池已經關閉了都需
             * 要繼續做兩次檢查(因為上次檢查之後可能存在一個死掉的
             * 線程)所以就再次檢查狀態,並且如果必要就會滾加入隊列的
             * 操作或者在一個也沒有的時候啟動一個新的線程。
             * 
             * 3. If we cannot queue task, then we try to add a new
             * thread.  If it fails, we know we are shut down or saturated
             * and so reject the task.        
             * 如果任務無法加入隊列,就嘗試添加一個新的線程,如果失敗,
             * 說明線程池關閉了或者隊列已經滿了所以拒絕了這次任務
             */
            int c = ctl.get();
            if (workerCountOf(c) < corePoolSize) {
                if (addWorker(command, true))
                    return;
                c = ctl.get();
            }
            if (isRunning(c) && workQueue.offer(command)) {
                int recheck = ctl.get();
                if (! isRunning(recheck) && remove(command))
                    reject(command);
                else if (workerCountOf(recheck) == 0)
                    addWorker(null, false);
            }
            else if (!addWorker(command, false))
                reject(command);
        }

    • submit()方法
      此方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並沒有對其進行重寫,這個方法也是用來向線程池提交任務的,但是它和execute()方法不同,它能夠返回任務執行的結果,去看submit()方法的實現,會發現它實際上還是調用的execute()方法,只不過它利用了Future來獲取任務執行結果

    • shutdown()和shutdownNow()方法
      這兩個方法是用來關閉線程池的。

多線程---線程池