1. 程式人生 > >執行緒池 ThreadPool 概念、應用例項

執行緒池 ThreadPool 概念、應用例項

轉自: http://www.2cto.com/kf/201312/267018.html

ThreadPool 先看成員變數Executor mExecutor。
執行緒池的基本思想還是一種物件池的思想,開闢一塊記憶體空間,裡面存放了眾多(未死亡)的執行緒,池中執行緒執行排程由池管理器來處理。當有執行緒任務時,從池中取一個,執行完成後執行緒物件歸池,這樣可以避免反覆建立執行緒物件所帶來的效能開銷,節省了系統的資源。
用執行緒池來管理的好處是,可以保證系統穩定執行,適用與有大量執行緒,高工作量的情景下使用,假如要展示1000張圖片如果建立1000個執行緒去載入,系統肯定會死掉。用執行緒池就可以避免這個問題,可以用5個執行緒輪流執行,5個一組,執行完的執行緒不直接回收而是等待下次執行,這樣對系統的開銷就可以減小不少。
=======================Executor的譯文部分==========================
Executor是Java工具類,執行提交給它的Runnable任務。該介面提供了一種基於任務執行機制的任務提交方法,包括執行緒使用詳細資訊,時序等等。Executor通常用於替代建立多執行緒。例如:你可能會使用以下方式來代替建立執行緒集合中的執行緒new Thread(new(RunnableTask())).start()。


Executor executor = anExecutor;
 executor.execute(new RunnableTask1());
 executor.execute(new RunnableTask2());
 ...
儘管如此,Executor介面沒有明確要求執行過程是非同步的。舉個最簡單的例子,一個Executor可以在呼叫者的執行緒中執行提交的任務。


class DirectExecutor implements Executor {
    public void execute(Runnable r) {
        r.run();
    }
}
更典型的是,任務也可以執行在其他的執行緒而不是呼叫者執行緒。以下程式碼就是在Executor中生成新的執行緒。


class ThreadPerTaskExecutor implements Executor {
    public void execute(Runnable r) {
        new Thread(r).start();
    }
}
很多Executor的實現按照任務的實現方式和時間來分類,下面的程式碼將提交的任務序列化給第二個Executor,闡述了一個組合的Executor。


class SerialExecutor implements Executor {
   final Queue tasks = new ArrayDeque();
   final Executor executor;
   Runnable active;
 
   SerialExecutor(Executor executor) {
     this.executor = executor;
    
   public synchronized void execute(final Runnable r) {
     tasks.offer(new Runnable() {
       public void run() {
         try {
           r.run();
         } finally {
           scheduleNext();
         }
       }
     });
     if (active == null) {
       scheduleNext();
     }
   }
 
   protected synchronized void scheduleNext() {
     if ((active = tasks.poll()) != null) {
       executor.execute(active);
     }
   }
 }
}
以上程式碼簡答講就是執行一個 SerialExecutor時,先執行Runnable的run(),然後再從Tasks任務堆疊中找到當前啟用的任務並執行。
在這個package包中實現的Executor實現了ExecutorService,它是個擴充套件介面。
而threadPoolExecutor類提供了一個擴充套件的執行緒池實現。Executors類給這些Executors提供了方便的工程方法。
記憶體一致性效果:在提交一個Runnable物件給Executor執行之前,執行緒中的行為可能先在另一個執行緒中發生。
=======================Executor的譯文部分==========================
Java裡面執行緒池的頂級介面是Executor,但是嚴格意義上講Executor並不是一個執行緒池,而只是一個執行執行緒的工具。真正的執行緒池介面是ExecutorService。
根據執行緒池的執行策略,Executor的execute()可能在新執行緒中執行,或者線上程池中的某個執行緒中執行,也可能是在呼叫者執行緒中執行。ExecutorService在Executor的基礎上增加了兩個核心方法:
1、Future submit(Runnable task)
2、 Future submit(Callable task)
差異點:這兩個方法都可以向執行緒池提交任務,區別在於Runnable執行完run()有返回值,而Callable執行完call()後有返回值。
共同點:submit都返回Future物件,Future物件可以阻塞執行緒直到執行完畢,也可以取消任務執行和檢測任務是否執行完畢。
在executors類裡面提供了一些靜態工廠,生成一些常用的執行緒池:
1、newSingleThreadExecutor:建立一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。
2、newFixedThreadPool:建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。
3、newCachedThreadPool:建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒(60秒不執行任務)的執行緒,當任務數增加時,此執行緒池又可以智慧的新增新執行緒來處理任務。此執行緒池不會對執行緒池大小做限制,執行緒池大小完全依賴於作業系統(或者說JVM)能夠建立的最大執行緒大小。
4、newScheduledThreadPool:建立一個大小無限的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。
5、newSingleThreadExecutor:建立一個單執行緒的執行緒池。此執行緒池支援定時以及週期性執行任務的需求。
下面再介紹下ThreadPoolExecutor函式,以便對執行緒池有進一步認識:
ThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue workQueue,
RejectedExecutionHandler handler);
corePoolSize: 執行緒池維護執行緒的最少數量
maximumPoolSize:執行緒池維護執行緒的最大數量
keepAliveTime: 執行緒池維護執行緒所允許的空閒時間
unit: 執行緒池維護執行緒所允許的空閒時間的單位
workQueue: 執行緒池所使用的緩衝佇列
handler: 執行緒池對拒絕任務的處理策略
當一個任務通過execute(Runnable)方法欲新增到執行緒池時:
1、如果此時執行緒池中的數量小於corePoolSize,即使執行緒池中的執行緒都處於空閒狀態,也要建立新的執行緒來處理被新增的任務。
2、如果此時執行緒池中的數量等於 corePoolSize,但是緩衝佇列 workQueue未滿,那麼任務被放入緩衝佇列。
3、如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量小於maximumPoolSize,建新的執行緒來處理被新增的任務。
4、如果此時執行緒池中的數量大於corePoolSize,緩衝佇列workQueue滿,並且執行緒池中的數量等於maximumPoolSize,那麼通過 handler所指定的策略來處理此任務。
也就是說,處理任務的優先順序為:
核心執行緒corePoolSize、任務佇列workQueue、最大執行緒maximumPoolSize,如果三者都滿了,使用handler處理被拒絕的任務。
簡單的例子:




ThreadPoolTestMain.java
package threadpool.test;
 
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
 
 
public class ThreadPoolTestMain {
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 4;
    private static final int KEEP_ACTIVE_TIME = 3;
    private static final int TASK_NUM = 10;
    private static final int PRODUCE_SLEEP_TIME = 10;
     
    static public void main(String[] args) {
    // 構造一個執行緒池  
        ThreadPoolExecutor threadPool = new ThreadPoolExecutor(CORE_POOL_SIZE, 
                            MAX_POOL_SIZE, 
                            KEEP_ACTIVE_TIME, 
                            TimeUnit.SECONDS, 
                            new ArrayBlockingQueue<runnable>(3),  
                            new ThreadPoolExecutor.DiscardOldestPolicy());
         
        for (int i = 1; i < TASK_NUM; i++) {
            String name = "Task" + i;
            try {
                System.out.println("ThreadPoolTestMain: put a task: " + name);
                threadPool.execute(new ThreadPoolTask(name));
                Thread.sleep(20);
            } catch (Exception err) {
                err.printStackTrace();
            }
        }
    }
}
 
ThreadPoolTask.java
package threadpool.test;
 
public class ThreadPoolTask implements Runnable {
    private String mTaskName;
    private static int CONSUME_SLEEP_TIME = 2000;
     
    public ThreadPoolTask(String name) {
        mTaskName = name;
    }
     
    @Override
    public void run() {
        // TODO Auto-generated method stub
        System.out.println(Thread.currentThread().getName());
        System.out.println("ThreadPoolTask :" + mTaskName);
         
        try {
            Thread.sleep(CONSUME_SLEEP_TIME);
        } catch (Exception err) {  
            err.printStackTrace();
        }
    }
}</runnable>