1. 程式人生 > >【本人禿頂程式設計師】面試-執行緒池的成長之路

【本人禿頂程式設計師】面試-執行緒池的成長之路

←←←←←←←←←←←← 我都禿頂了,還不點關注!

背景

相信大家在面試過程中遇到面試官問執行緒的很多,執行緒過後就是執行緒池了。從易到難,都是這麼個過程,還有就是確實很多人在工作中接觸執行緒池比較少,最多的也就是建立一個然後往裡面提交執行緒,對於一些經驗很豐富的面試官來說,一下就可以問出很多執行緒池相關的問題,與其被問的暈頭轉向,還不如好好學習。此時不努力更待何時。

什麼是執行緒池?

執行緒池是一種多執行緒處理形式,處理過程中將任務提交到執行緒池,任務的執行交由執行緒池來管理。

如果每個請求都建立一個執行緒去處理,那麼伺服器的資源很快就會被耗盡,使用執行緒池可以減少建立和銷燬執行緒的次數,每個工作執行緒都可以被重複利用,可執行多個任務。

如果用生活中的列子來說明,我們可以把執行緒池當做一個客服團隊,如果同時有1000個人打電話進行諮詢,按照正常的邏輯那就是需要1000個客服接聽電話,服務客戶。現實往往需要考慮到很多層面的東西,比如:資源夠不夠,招這麼多人需要費用比較多。正常的做法就是招100個人成立一個客服中心,當有電話進來後分配沒有接聽的客服進行服務,如果超出了100個人同時諮詢的話,提示客戶等待,稍後處理,等有客服空出來就可以繼續服務下一個客戶,這樣才能達到一個資源的合理利用,實現效益的最大化。

Java中的執行緒池種類

1. newSingleThreadExecutor

建立方式:

ExecutorService pool = Executors.newSingleThreadExecutor();

一個單執行緒的執行緒池。這個執行緒池只有一個執行緒在工作,也就是相當於單執行緒序列執行所有任務。如果這個唯一的執行緒因為異常結束,那麼會有一個新的執行緒來替代它。此執行緒池保證所有任務的執行順序按照任務的提交順序執行。

使用方式:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
            });
        }
    }
}

輸出結果如下:

pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....
pool-1-thread-1    開始發車啦....

從輸出的結果我們可以看出,一直只有一個執行緒在執行。

2.newFixedThreadPool

建立方式:

ExecutorService pool = Executors.newFixedThreadPool(10);

建立固定大小的執行緒池。每次提交一個任務就建立一個執行緒,直到執行緒達到執行緒池的最大大小。執行緒池的大小一旦達到最大值就會保持不變,如果某個執行緒因為執行異常而結束,那麼執行緒池會補充一個新執行緒。

使用方式:

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 10; i++) {
            pool.execute(() -> {
                System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
            });
        }
    }
}

輸出結果如下:

pool-1-thread-1    開始發車啦....
pool-1-thread-4    開始發車啦....
pool-1-thread-3    開始發車啦....
pool-1-thread-2    開始發車啦....
pool-1-thread-6    開始發車啦....
pool-1-thread-7    開始發車啦....
pool-1-thread-5    開始發車啦....
pool-1-thread-8    開始發車啦....
pool-1-thread-9    開始發車啦....
pool-1-thread-10 開始發車啦....

3. newCachedThreadPool

建立方式:

ExecutorService pool = Executors.newCachedThreadPool();

建立一個可快取的執行緒池。如果執行緒池的大小超過了處理任務所需要的執行緒,那麼就會回收部分空閒的執行緒,當任務數增加時,此執行緒池又新增新執行緒來處理任務。

使用方式如上2所示。

4.newScheduledThreadPool

建立方式:

ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);

此執行緒池支援定時以及週期性執行任務的需求。

使用方式:

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class ThreadPool {
    public static void main(String[] args) {
        ScheduledExecutorService pool = Executors.newScheduledThreadPool(10);
        for (int i = 0; i < 10; i++) {
            pool.schedule(() -> {
                System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
            }, 10, TimeUnit.SECONDS);
        }
    }
}

上面演示的是延遲10秒執行任務,如果想要執行週期性的任務可以用下面的方式,每秒執行一次

//pool.scheduleWithFixedDelay也可以
pool.scheduleAtFixedRate(() -> {
                System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
}, 1, 1, TimeUnit.SECONDS);

5.newWorkStealingPool
newWorkStealingPool是jdk1.8才有的,會根據所需的並行層次來動態建立和關閉執行緒,通過使用多個佇列減少競爭,底層用的ForkJoinPool來實現的。ForkJoinPool的優勢在於,可以充分利用多cpu,多核cpu的優勢,把一個任務拆分成多個“小任務”,把多個“小任務”放到多個處理器核心上並行執行;當多個“小任務”執行完成之後,再將這些執行結果合併起來即可。

說說執行緒池的拒絕策略

當請求任務不斷的過來,而系統此時又處理不過來的時候,我們需要採取的策略是拒絕服務。RejectedExecutionHandler介面提供了拒絕任務處理的自定義方法的機會。在ThreadPoolExecutor中已經包含四種處理策略。

  • AbortPolicy策略:該策略會直接丟擲異常,阻止系統正常工作。

public static class AbortPolicy implements RejectedExecutionHandler {
    public AbortPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
    }
}
  • CallerRunsPolicy 策略:只要執行緒池未關閉,該策略直接在呼叫者執行緒中,運行當前的被丟棄的任務。

.

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    public CallerRunsPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
                r.run();
        }
    }
}
  • DiscardOleddestPolicy策略: 該策略將丟棄最老的一個請求,也就是即將被執行的任務,並嘗試再次提交當前任務。

.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {
    public DiscardOldestPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            e.getQueue().poll();
            e.execute(r);
        }
    }
}
  • DiscardPolicy策略:該策略默默的丟棄無法處理的任務,不予任何處理。

.

public static class DiscardPolicy implements RejectedExecutionHandler {
    public DiscardPolicy() { }
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
    }
}

除了JDK預設為什麼提供的四種拒絕策略,我們可以根據自己的業務需求去自定義拒絕策略,自定義的方式很簡單,直接實現RejectedExecutionHandler介面即可

比如Spring integration中就有一個自定義的拒絕策略CallerBlocksPolicy,將任務插入到佇列中,直到佇列中有空閒並插入成功的時候,否則將根據最大等待時間一直阻塞,直到超時

package org.springframework.integration.util;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
public class CallerBlocksPolicy implements RejectedExecutionHandler {
    private static final Log logger = LogFactory.getLog(CallerBlocksPolicy.class);
    private final long maxWait;
    /**
     * @param maxWait The maximum time to wait for a queue slot to be
     * available, in milliseconds.
     */
    public CallerBlocksPolicy(long maxWait) {
        this.maxWait = maxWait;
    }
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (!executor.isShutdown()) {
            try {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (logger.isDebugEnabled()) {
                    logger.debug("Attempting to queue task execution for " + this.maxWait + " milliseconds");
                }
                if (!queue.offer(r, this.maxWait, TimeUnit.MILLISECONDS)) {
                    throw new RejectedExecutionException("Max wait time expired to queue task");
                }
                if (logger.isDebugEnabled()) {
                    logger.debug("Task execution queued");
                }
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RejectedExecutionException("Interrupted", e);
            }
        }
        else {
            throw new RejectedExecutionException("Executor has been shut down");
        }
    }
}

定義好之後如何使用呢?光定義沒用的呀,一定要用到執行緒池中呀,可以通過下面的方式自定義執行緒池,指定拒絕策略。

BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<>(100);
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    10, 100, 10, TimeUnit.SECONDS, workQueue, new CallerBlocksPolicy());

execute和submit的區別?

在前面的講解中,我們執行任務是用的execute方法,除了execute方法,還有一個submit方法也可以執行我們提交的任務。

這兩個方法有什麼區別呢?分別適用於在什麼場景下呢?我們來做一個簡單的分析。

execute適用於不需要關注返回值的場景,只需要將執行緒丟到執行緒池中去執行就可以了

public class ThreadPool {
    public static void main(String[] args) {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        pool.execute(() -> {
            System.out.println(Thread.currentThread().getName() + "\t開始發車啦....");
        });
    }
}

submit方法適用於需要關注返回值的場景,submit方法的定義如下:

public interface ExecutorService extends Executor {
  ...
  <T> Future<T> submit(Callable<T> task);
  <T> Future<T> submit(Runnable task, T result);
  Future<?> submit(Runnable task);
  ...
}

其子類AbstractExecutorService實現了submit方法,可以看到無論引數是Callable還是Runnable,最終都會被封裝成RunnableFuture,然後再呼叫execute執行。

/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
/**
 * @throws RejectedExecutionException {@inheritDoc}
 * @throws NullPointerException       {@inheritDoc}
 */
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

下面我們來看看這三個方法分別如何去使用:

submit(Callable task);

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Future<String> future = pool.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                return "Hello";
            }
        });
        String result = future.get();
        System.out.println(result);
    }
}

submit(Runnable task, T result);

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(10);
        Data data = new Data();
        Future<Data> future = pool.submit(new MyRunnable(data), data);
        String result = future.get().getName();
        System.out.println(result);
    }
}
class Data {
    String name;
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
}
class MyRunnable implements Runnable {
    private Data data;
    public MyRunnable(Data data) {
        this.data = data;
    }
    @Override
    public void run() {
        data.setName("yinjihuan");
    }
}

Future<?> submit(Runnable task);
直接submit一個Runnable是拿不到返回值的,返回值就是null.

五種執行緒池的使用場景

  • newSingleThreadExecutor:一個單執行緒的執行緒池,可以用於需要保證順序執行的場景,並且只有一個執行緒在執行。

  • newFixedThreadPool:一個固定大小的執行緒池,可以用於已知併發壓力的情況下,對執行緒數做限制。

  • newCachedThreadPool:一個可以無限擴大的執行緒池,比較適合處理執行時間比較小的任務。

  • newScheduledThreadPool:可以延時啟動,定時啟動的執行緒池,適用於需要多個後臺執行緒執行週期任務的場景。

  • newWorkStealingPool:一個擁有多個任務佇列的執行緒池,可以減少連線數,建立當前可用cpu數量的執行緒來並行執行。

執行緒池的關閉

關閉執行緒池可以呼叫shutdownNow和shutdown兩個方法來實現

shutdownNow:對正在執行的任務全部發出interrupt(),停止執行,對還未開始執行的任務全部取消,並且返回還沒開始的任務列表

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 5; i++) {
            System.err.println(i);
            pool.execute(() -> {
                try {
                    Thread.sleep(30000);
                    System.out.println("--");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        List<Runnable> runs = pool.shutdownNow();
    }
}

上面的程式碼模擬了立即取消的場景,往執行緒池裡新增5個執行緒任務,然後sleep一段時間,執行緒池只有一個執行緒,如果此時呼叫shutdownNow後應該需要中斷一個正在執行的任務和返回4個還未執行的任務,控制檯輸出下面的內容:

0
1
2
3
4
[fs.ThreadPool$$Lambda$1/[email protected], 
fs.ThreadPool$$Lambda$1/[email protected], 
fs.ThreadPool$$Lambda$1/[email protected], 
fs.ThreadPool$$Lambda$1/[email protected]]
java.lang.InterruptedException: sleep interrupted
    at java.lang.Thread.sleep(Native Method)
    at fs.ThreadPool.lambda$0(ThreadPool.java:15)
    at fs.ThreadPool$$Lambda$1/990368553.run(Unknown Source)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

shutdown:當我們呼叫shutdown後,執行緒池將不再接受新的任務,但也不會去強制終止已經提交或者正在執行中的任務

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 5; i++) {
            System.err.println(i);
            pool.execute(() -> {
                try {
                    Thread.sleep(30000);
                    System.out.println("--");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        pool.shutdown();
        pool.execute(() -> {
            try {
                Thread.sleep(30000);
                System.out.println("--");
            } catch (Exception e) {
                e.printStackTrace();
            }
        });
    }
}

上面的程式碼模擬了正在執行的狀態,然後呼叫shutdown,接著再往裡面新增任務,肯定是拒絕新增的,請看輸出結果:

0
1
2
3
4
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task fs.ThreadPool$$Lambda$2/[email protected] rejected from [email protected][Shutting down, pool size = 1, active threads = 1, queued tasks = 4, completed tasks = 0]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at fs.ThreadPool.main(ThreadPool.java:24)

還有一些業務場景下需要知道執行緒池中的任務是否全部執行完成,當我們關閉執行緒池之後,可以用isTerminated來判斷所有的執行緒是否執行完成,千萬不要用isShutdown,isShutdown只是返回你是否呼叫過shutdown的結果。

public class ThreadPool {
    public static void main(String[] args) throws Exception {
        ExecutorService pool = Executors.newFixedThreadPool(1);
        for (int i = 0; i < 5; i++) {
            System.err.println(i);
            pool.execute(() -> {
                try {
                    Thread.sleep(3000);
                    System.out.println("--");
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        Thread.sleep(1000);
        pool.shutdown();
        while(true){  
            if(pool.isTerminated()){  
                System.out.println("所有的子執行緒都結束了!");  
                break;  
            }  
            Thread.sleep(1000);    
        }  
    }
}

自定義執行緒池

在實際的使用過程中,大部分我們都是用Executors去建立執行緒池直接使用,如果有一些其他的需求,比如指定執行緒池的拒絕策略,阻塞佇列的型別,執行緒名稱的字首等等,我們可以採用自定義執行緒池的方式來解決。

如果只是簡單的想要改變執行緒名稱的字首的話可以自定義ThreadFactory來實現,在Executors.new…中有一個ThreadFactory的引數,如果沒有指定則用的是DefaultThreadFactory。

自定義執行緒池核心在於建立一個ThreadPoolExecutor物件,指定引數

下面我們看下ThreadPoolExecutor建構函式的定義:

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) ;
  • corePoolSize
    執行緒池大小,決定著新提交的任務是新開執行緒去執行還是放到任務佇列中,也是執行緒池的最最核心的引數。一般執行緒池開始時是沒有執行緒的,只有當任務來了並且執行緒數量小於corePoolSize才會建立執行緒。
  • maximumPoolSize
    最大執行緒數,執行緒池能建立的最大執行緒數量。
  • keepAliveTime
    線上程數量超過corePoolSize後,多餘空閒執行緒的最大存活時間。
  • unit
    時間單位
  • workQueue
    存放來不及處理的任務的佇列,是一個BlockingQueue。
  • threadFactory
    生產執行緒的工廠類,可以定義執行緒名,優先順序等。
  • handler
    拒絕策略,當任務來不及處理的時候,如何處理, 前面有講解。

瞭解上面的引數資訊後我們就可以定義自己的執行緒池了,我這邊用ArrayBlockingQueue替換了LinkedBlockingQueue,指定了佇列的大小,當任務超出佇列大小之後使用CallerRunsPolicy拒絕策略處理。

這樣做的好處是嚴格控制了佇列的大小,不會出現一直往裡面新增任務的情況,有的時候任務處理的比較慢,任務數量過多會佔用大量記憶體,導致記憶體溢位。

當然你也可以在提交到執行緒池的入口進行控制,比如用CountDownLatch, Semaphore等。

/**
 * 自定義執行緒池<br>
 * 預設的newFixedThreadPool裡的LinkedBlockingQueue是一個無邊界佇列,如果不斷的往裡加任務,最終會導致記憶體的不可控<br>
 * 增加了有邊界的佇列,使用了CallerRunsPolicy拒絕策略
 * @author yinjihuan
 *
 */
public class FangjiaThreadPoolExecutor {
    private static ExecutorService executorService = newFixedThreadPool(50);
    private static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<Runnable>(10000), new DefaultThreadFactory(), new CallerRunsPolicy());
    }
    public static void execute(Runnable command) {
        executorService.execute(command);
    }
    public static void shutdown() {
        executorService.shutdown();
    }
    static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;
        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "FSH-pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-";
        }
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}

寫在最後:

禿頂程式設計師的不易,看到這裡,點了關注吧!
點關注,不迷路,持續更新!!!