1. 程式人生 > >JAVA Executor框架建立執行緒池

JAVA Executor框架建立執行緒池

為了更好的控制多執行緒,JDK提供理論一套執行緒框架Executor,幫助開發人員有效的進行執行緒控制。它們都在java.util.concurrent包中,是JDK併發包的核心。其中有一個比較重要的類:Executors,他扮演著執行緒工廠的角色,我們通過Executors可以建立特定功能的執行緒池。

一、JDK提供的四類執行緒池

Executors建立執行緒池的四種方式:

  ExecutorService pool1=Executors.newFixedThreadPool(10);

 ExecutorService pool2=Executors.newSingleThreadExecutor();

ExecutorService pool3=Executors.newCachedThreadPool(); 

  1. newFixedThreadPool()方法,該方法返回一個固定數量的執行緒池,該方法的執行緒數始終不變,當有一個任務提交時,若執行緒池中空閒,則立即執行,若沒有,則會被暫緩在一個任務佇列中等待有空閒的執行緒去執行。
  2. newSingleThreadExecutor()方法,建立一個執行緒的執行緒池,若空閒則執行,若沒有空閒執行緒則暫緩在任務佇列中。
  3. newCachedThreadPool()方法,返回一個可根據實際情況調整執行緒個數的執行緒池,不限制最大執行緒數量,若有空閒的執行緒則執行任務,若無任務則不建立執行緒。並且每一個空執行緒會在60秒後自動回收。
  4. newScheduledThreadPool()方法,該方法返回一個SchededExecutorService物件,但該執行緒池可以指定執行緒的數量。

 ExecutorService pool4=Executors.newScheduledThreadPool(5);

這四個執行緒池底層核心都是通過new ThreadPoolExecutor(..........)

 newScheduledThreadPool()實現定時器的功能

定時任務示例:

public class ScheduledJob {
    public static void main(String[] args) {
        Temp t=new Temp();
        ScheduledExecutorService scheduled=Executors.newScheduledThreadPool(1);
        //public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
        //        long initialDelay,
        //        long delay,
        //        TimeUnit unit);command:執行緒任務,initialDelay:初始執行時間,delay:間隔執行時間,unit:時間單位
        ScheduledFuture<?> scheduledTask=scheduled.scheduleWithFixedDelay(t, 5, 3, TimeUnit.SECONDS);
    }
}
class Temp implements Runnable{

    @Override
    public void run() {
        System.out.println("gogogogo");
    }   
}

二、自定義執行緒池

若Executors工廠類無法滿足我們的需求,可以自己去建立自定義的執行緒池,其實Executors工廠類裡面的建立執行緒池方法其內部實現均是用ThreadPoolExecutor這個類,這個類可以自定義執行緒池。構造方法如下:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler){......} 

corePoolSize:當前核心執行緒

maximumPoolSize:最大執行緒數

keepAliveTime:每個執行緒保持活著的時間

unit:每個執行緒保持活著的時間的單位

workQueue:快取佇列,當執行緒達到上限,用來存放任務

handler:拒絕策略(拒絕執行任務/寫日誌等操作)

這個構造方法對於佇列是什麼型別的比較關鍵:

有界的任務佇列時:若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize,則優先建立執行緒,若大於corePoolSize,則會將任務加入佇列,若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,建立新的執行緒,若執行緒數大於maximumPoolSize,則執行拒絕策略。或其他自定義方式。

無界的佇列任務時: LinkedBlockingQueue。與有界佇列相比,除非系統資源耗盡,否則無界的任務佇列不存在任務入隊失敗的情況。當有新任務到來,系統的執行緒數小於corePoolSize時,則新建執行緒執行任務。當達到corePoolSize後,就不會繼續增加。若後續仍有新的任務加入,而又沒有空閒的執行緒資源,則任務直接進入佇列等待。若任務建立和處理的速度差異很大,無界佇列會保持快速增長,直到耗盡系統記憶體。

JDK拒絕策略:

AbortPolicy:直接丟擲異常阻止系統正常工作

CallerRunsPolicy:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前被丟棄的任務。

DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。

DisCardPolicy:丟棄無法處理的任務,不給予任何處理。

如果需要自定義拒絕策略可以實現RejectedExecutlonHandler介面。

 有界的任務佇列示例:

 

public class UseThreadPoolExecutor1 {
    
    public static void main(String[] args) {
        /**
         * 有界的任務佇列時:若有新的任務需要執行,如果執行緒池實際執行緒數小於corePoolSize,
         * 則優先建立執行緒,若大於corePoolSize,則會將任務加入佇列,
         * 若佇列已滿,則在匯流排程數不大於maximumPoolSize的前提下,
         * 建立新的執行緒,若執行緒數大於maximumPoolSize,則執行拒絕策略。或其他自定義方式。
         */
        ThreadPoolExecutor pool=new ThreadPoolExecutor(
                1,
                2,
                60,
                TimeUnit.SECONDS,
                new ArrayBlockingQueue<Runnable>(3));
        
        MyTask mt1=new MyTask(1, "任務1");
        MyTask mt2=new MyTask(2, "任務2");
        MyTask mt3=new MyTask(3, "任務3");
        MyTask mt4=new MyTask(4, "任務4");
        MyTask mt5=new MyTask(5, "任務5");
        MyTask mt6=new MyTask(6, "任務6");
        
        pool.execute(mt1);
        pool.execute(mt2);
        pool.execute(mt3);
        pool.execute(mt4);
        pool.execute(mt5);
//        pool.execute(mt6);
        
        pool.shutdown();
    }
}
class MyTask implements Runnable{
    
    private int id;
    private String name;

    
    public MyTask(int id, String name) {
        super();
        this.id = id;
        this.name = name;
    }


    public int getId() {
        return id;
    }


    public void setId(int id) {
        this.id = id;
    }


    public String getName() {
        return name;
    }


    public void setName(String name) {
        this.name = name;
    }
    @Override
    public String toString() {
        return "MyTask [id=" + id + ", name=" + name + ", getId()=" + getId() + ", getName()=" + getName()
                + ", getClass()=" + getClass() + ", hashCode()=" + hashCode() + ", toString()=" + super.toString()
                + "]";
    }

    @Override
    public void run() {        
        try {
            System.out.println("taskid:"+this.id);
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

}

輸出結果:

taskid:1
taskid:5
taskid:2
taskid:3
taskid:4

從上面輸出結果的順序可以看出先是初始建立執行緒執行任務,當達到corePoolSize值時將任務加入到有界隊裡,當佇列達到上限是,則建立新的執行緒執行任務。

無界佇列示例:

public class UseThreadPoolExecutor2 implements Runnable{

    private static AtomicInteger count=new AtomicInteger(0);
    @Override
    public void run() {
        try {
            int temp=count.incrementAndGet();
            System.out.println("任務"+temp);
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    public static void main(String[] args) throws Exception {
        //無界佇列
        LinkedBlockingQueue<Runnable> queue=new LinkedBlockingQueue<>();
        ExecutorService exService=new ThreadPoolExecutor(
                5,
                10,
                120L,
                TimeUnit.SECONDS,
                queue);
        for(int i=0;i<20;i++) {
            exService.execute(new UseThreadPoolExecutor2());
        }
        Thread.sleep(2000);
        System.out.println("queue size:"+queue.size());        
    }
}

 執行結果:

任務1
任務2
任務3
任務4
任務5
queue size:15
任務6
任務7
任務9
任務8
任務10
任務11
任務12
任務15
任務14
任務13
任務16
任務17
任務18
任務19
任務20

拒絕策略示例:

public class MyRejected implements RejectedExecutionHandler{

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        //執行log記錄/傳送請求通知客戶端拒絕此任務等操作
        System.out.println("自定義處理。。。");
        System.out.println("當前被拒絕的任務:"+r.toString());
    }

}

一般在拒絕策略中根據自己程式場景需求來處理,最好採用記錄日誌等