1. 程式人生 > >Java粗淺認識-併發程式設計(五)-執行緒池

Java粗淺認識-併發程式設計(五)-執行緒池

執行緒池

先來總攬一下執行緒池結構

ExecutorService

以上是執行緒池結構,常用的工具java.util.concurrent.Executors

結構如下

Executors

在Executors中常用的方法

Executors.newCachedThreadPool()

建立執行緒池核心poolSize = 0,最大poolSize=Integer.MAX_VALUE,執行緒任務執行完後,如果沒有新任務,會在60s後被回收,使用SynchronousQueue佇列做阻塞佇列,佇列容量是0,每次新增時,必須阻塞等待刪除,每次刪除都要等待新增,對任務工作時間比較小的任務,效率很高,對執行緒的影響如果一直有任務新增,並且正在工作的執行緒都沒有結束,會一直new新執行緒出來執行,執行緒數量上限是Integer.MAX_VALUE.,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。

例項程式碼

    private static void newCachedThreadPool() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }

Executors.newFixedThreadPool(5)

固定執行緒池大小的執行緒,核心poolSize=maxPoolSize,執行緒一旦啟動,就不會被回收,如果其中某個執行緒掛掉了,會新啟動一個執行緒來填充,使用LinkedBlockingQueue阻塞佇列為等待佇列,如果執行緒全都在執行,那麼新新增進來的任務全部會放到阻塞佇列中去,最大值是Integer.MAX_VALUE,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。

例項程式碼

    private static void newFixedThreadPool() {
        ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 100; i++) {
            executorService.execute(new Task());
        }
        executorService.shutdown();
    }


Executors.newScheduledThreadPool(5)

編排任務的執行緒池,定時執行,必須初始化一個核心執行緒數量大小,最大執行緒數Integer.MAX_VALUE,執行緒一旦new出來就不會回收,使用DelayedWorkQueue任務編排的阻塞佇列,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。

示例程式碼,有三種類型,

第一種是直接延遲一定時間後,執行一次任務;

第二種,在延遲一定時間後,第一次執行,後續執行等待前一個執行緒執行完畢後再延遲一定時候後執行;

第三種,在延遲一定時候後,第一次執行,後續執行等待前一個任務執行開始的時間計時延遲,如果已經到了就執行,如果沒到等待時刻到來執行。

/**
     * 在initialDelay時間延遲後第一次執行,之後以上一個任務開始時間點開始計算,每間隔delay後執行一次,如果delay已經到了
     * 則立即執行,如果沒有到,等待時刻到來再執行
     */
    private static void scheduleAtFixedRate() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleAtFixedRate(new Task(), 5, 10, TimeUnit.SECONDS);
//        executorService.shutdown();
    }


    /**
     * 在initialDelay時間延遲後第一次執行,之後以上一個任務結束時間點開始計算,每間隔delay後執行一次
     */
    private static void scheduleWithFixedDelay() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.scheduleWithFixedDelay(new Task(), 5, 10, TimeUnit.SECONDS);
//        executorService.shutdown();
    }

    /**
     * 延遲一定時間後執行
     */
    private static void scheduledDelayThreadPool() {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
        executorService.schedule(new Task(), 5, TimeUnit.SECONDS);
        executorService.shutdown();
    }


Executors.newSingleThreadExecutor()

單執行緒的執行緒池,如果在任務時,執行緒掛掉了,會立即新啟一個執行緒,最大執行緒1,核心執行緒1,阻塞佇列LinkedBlockingQueue,不會回收執行緒,預設拒絕策略,AbortPolicy,直接拋錯,結束當前執行緒。

    /**
     * 單執行緒的執行緒池,如果在任務時,執行緒掛掉了,會立即新啟一個執行緒
     */
    private static void newSingleThreadExecutor() {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(new Task());
        executorService.shutdown();
    }

Executors.newWorkStealingPool() java 1.8

1.8新增的執行緒池

    public static void newWorkStealingPool() throws InterruptedException {
        //forkJoin
        ExecutorService forkJoinPool = Executors.newWorkStealingPool();
        List<CallableTask> callableTaskList = new ArrayList<>();
        callableTaskList.add(new CallableTask());
        callableTaskList.add(new CallableTask());
        callableTaskList.add(new CallableTask());
        List<Future<Integer>> futures = forkJoinPool.invokeAll(callableTaskList);
        futures.forEach(integerFuture -> {
            try {
                System.out.println(integerFuture.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        });
    }

這個例項不太好,下次想好了,再改哈,根據上面的層級關係圖,forkJoin不屬於ThreadPoolExecutor,接下來我們看看ForkJoinPool執行緒池的實現初始化時,執行緒數是核心數,非同步,異常處理為null

在使用ThreadPoolExecutor時,建議自己建立,不使用Executors工具類建立。用Executors裡面對拒絕任務新增時很不友好,而使用自己建立ThreadPoolExecutor時,可以自己重寫拒絕策略。

示例程式碼

    private static void threadPoolExecutor() {
        ExecutorService executorService = new ThreadPoolExecutor(4, 16, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1 << 4),new MyRejectedExecutionHandler());
    }

    public static class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("需要日誌儲存的訊息。" + r.toString() + executor.toString());
        }
    }

優先順序佇列線上程池中的使用

模擬VIP使用者,VIP等級高的優先執行。

    public static class VIP implements Comparable<VIP>, Runnable {
        private String name;
        private int vipLevel;

        public VIP(String name, int vipLevel) {
            this.name = name;
            this.vipLevel = vipLevel;
        }

        @Override
        public int compareTo(VIP o) {
            return Integer.compare(o.vipLevel, this.vipLevel);
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " " + name + " 執行。vip等級" + vipLevel);
        }
    }

    private static void threadPoolExecutor() {
        PriorityBlockingQueue<Runnable> priorityBlockingQueue = new PriorityBlockingQueue();

        ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("VIP 執行緒-%d").build();
        ExecutorService executorService =
                new ThreadPoolExecutor(4,
                        16,
                        10,
                        TimeUnit.SECONDS,
                        priorityBlockingQueue,
                        threadFactory,
                        new MyRejectedExecutionHandler());
        for (int i = 0; i < 100; i++) {
            int vipLevel = i % 2;
            executorService.execute(new VIP("使用者" + i, vipLevel));
        }
    }

總結

本章節對執行緒池的處理進行了講解,需要多加練習。