1. 程式人生 > >網際網路架構(3):併發程式設計--執行緒池

網際網路架構(3):併發程式設計--執行緒池

3 Executor框架

為了更好的控制多執行緒,JDK提供了一套執行緒框架Executor,幫助開發人員有效的進行執行緒控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,它扮演著執行緒工廠的角色,我們通過Executors可以建立特定功能的執行緒池。

(1) JDK提供的執行緒池

Executors建立執行緒池的方法:

  • newFixedThreadPool()方法,該方法返回一個固定數量的執行緒池,該方法的執行緒數始終不變,當有一個任務提交時,若執行緒池中空閒,則立即執行,若沒有,則會被暫緩在一個任務佇列中,等待有空閒的執行緒去執行。

  • newSingleThreadExecutor()方法,建立一個執行緒的執行緒池,若空閒則執行,若沒有空閒執行緒,則暫緩在任務佇列中

  • newCachedThreadPool()方法,該方法返回一個可根據實際情況調整執行緒個數的執行緒池,不限制最大執行緒數量,若有空閒的執行緒則用空閒的執行緒執行任務,否則建立一個新的執行緒,若無任務則不建立執行緒。並且每一個空閒執行緒會在60秒後自動回收

  • newScheduledThreadPool()方法,該方法返回一個SchededExecutorService物件,但該執行緒池可以指定執行緒的數量,類似於定時器,可以做定時輪詢。

若Executors工廠類無法滿足我們的需求,可以自己去建立自定義的執行緒池,其實Executors工廠類裡面的建立執行緒方法其內部實現均是用了ThreadPoolExecutor這個類,這個類可以自定義執行緒池。構造方法如下

     public ThreadPoolExecutor(int corePoolSize, //核心執行緒數的大小(預設建立的執行緒個數)
                              int maximumPoolSize, //最大執行緒數
                              long keepAliveTime, //每個執行緒的空閒時間
                              TimeUnit unit, //空閒時間的時間單位
                              BlockingQueue<Runnable> workQueue,//執行緒緩衝佇列
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {//拒絕執行時呼叫的方法

    }
  • 提交執行緒到執行緒池有兩種方法,submit和execute。兩者有如下兩個區別:
    • 第一:submit可以提交Runnable或者是Callable介面的實現類,而execute則不行。
    • 第二:submit有返回值,返回值為Future的介面實現類,而execute沒有返回值。
(2) 自定義執行緒池詳解
  • 使用什麼型別的佇列是關鍵

    • 使用有界佇列 : 若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize則優先建立執行緒,若大於corePoolSize,則會將任務加入佇列,若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒數,若執行緒數大於maximumPoolSize,則執行拒絕策略,或其他自定義方式

    • 使用無界佇列 : LinkedBlockQueue。與有界佇列相比,除非系統資源消耗盡,否則無界的任務佇列不存在入隊失敗的情況。當有新任務到來,系統的執行緒數小於corePoolSize時,則建立新執行緒執行任務。當到達corePoolSize後,就不會繼續增加。若後續任由新的任務加入,而又沒有空閒的執行緒資源,則任務直接進入佇列等待。若任務建立和處理的速度差異很大,無界佇列會保持快速增長,直到耗盡系統記憶體。

  • JDK拒絕策略(屬於ThreadPoolExecutor的內部靜態類)

    • AbortPolicy : 直接丟擲異常,阻止系統正常工作
    • CallerRunsPolicy : 只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前要被丟棄的任務。
    • DiscardOldestPolicy : 丟棄最老的一個請求,嘗試再次提交當前任務。
    • DiscardPolicy : 丟棄無法處理的任務,不給予任何處理。

如果需要自定義拒絕策略,可以實現RejectedExecutionHandler介面,

自定義策略常見處理方式:(一般開發中在拒絕策略中僅僅只是記錄日誌即可)
  • 1、通過某種方式,向請求源反饋拒絕提示(高峰期不建議採用,可能導致請求源一直提示錯誤)
  • 2、記錄日誌,記錄任務的關鍵資訊,然後可以在非高峰期採取日誌解析或者其他方式解析日誌進行重新處理等
  • 3、使用一種其他的方式快取當前任務,之後再進行處理

  • MyTask.java

    public class MyTask implements Runnable{
    
        private int taskId;
        private String taskName;
    
        public MyTask(int taskId, String taskName) {
            super();
            this.taskId = taskId;
            this.taskName = taskName;
        }
    
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                System.out.println(Thread.currentThread().getName() + " - run taskId = " + this.taskId);
                Thread.sleep(5 * 1000);
            } catch (Exception e) {
                // TODO: handle exception
            }
        }
    
        @Override
        public String toString() {
            // TODO Auto-generated method stub
            return Integer.toString(this.taskId);
        }
    
    
    }
    
  • MyRejected.java

    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class MyRejected implements RejectedExecutionHandler{
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            // TODO Auto-generated method stub
            System.out.println("自定義處理...");
            System.out.println("當前被拒絕的任務為:" + r.toString());
        }
    
    }
    
  • UseThreadPoolExecutor2.java

    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;SynchronousQueue
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class UseThreadPoolExecutor2 {
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<Runnable> queue =
    //              new LinkedBlockingQueue<Runnable>();//使用無界佇列
                    new ArrayBlockingQueue<Runnable>(3);//使用有界佇列
            ExecutorService pool = new ThreadPoolExecutor(
                    1, 
                    2, 
                    120L, 
                    TimeUnit.SECONDS, 
                    queue,
    //              new ThreadPoolExecutor.DiscardOldestPolicy());//jdk自帶拒絕策略,丟棄最老的一個請求
                    new MyRejected());//自定義拒絕策略
    
            for(int i = 1; i <= 20; i++){
                MyTask task = new MyTask(i, "任務" + i);
                pool.execute(task);
            }
    
            Thread.sleep(1000);
            System.out.println("queue size : " + queue.size());
            Thread.sleep(2000);
    
        }
    
    }