1. 程式人生 > >springboot之多工並行+執行緒池處理

springboot之多工並行+執行緒池處理

最近專案中做到一個關於批量發簡訊的業務,如果使用者量特別大的話,不能使用單執行緒去發簡訊,只能嘗試著使用多工來完成!我們的專案使用到了方式二,即Future的方案

image

Java 執行緒池
Java通過Executors提供四種執行緒池,分別為:

newCachedThreadPool建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。
newFixedThreadPool 建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。
newScheduledThreadPool 建立一個定長執行緒池,支援定時及週期性任務執行。
newSingleThreadExecutor 建立一個單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。

優點
重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
提供定時執行、定期執行、單執行緒、併發數控制等功能。
方式一(CountDownLatch)
public class StatsDemo {
    final static SimpleDateFormat sdf = new SimpleDateFormat(
            "yyyy-MM-dd HH:mm:ss");

    final static String startTime = sdf.format(new Date());

    /**
     * IO密集型任務  = 一般為2*CPU核心數(常出現於執行緒中:資料庫資料互動、檔案上傳下載、網路資料傳輸等等)
     * CPU密集型任務 = 一般為CPU核心數+1(常出現於執行緒中:複雜演算法)
     * 混合型任務  = 視機器配置和複雜度自測而定
     */
private static int corePoolSize = Runtime.getRuntime().availableProcessors(); /** * public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime, * TimeUnit unit,BlockingQueue<Runnable> workQueue) * corePoolSize用於指定核心執行緒數量 * maximumPoolSize指定最大執行緒數 * keepAliveTime和TimeUnit指定執行緒空閒後的最大存活時間 * workQueue則是執行緒池的緩衝佇列,還未執行的執行緒會在佇列中等待 * 監控佇列長度,確保佇列有界 * 不當的執行緒池大小會使得處理速度變慢,穩定性下降,並且導致記憶體洩露。如果配置的執行緒過少,則佇列會持續變大,消耗過多記憶體。 * 而過多的執行緒又會 由於頻繁的上下文切換導致整個系統的速度變緩——殊途而同歸。佇列的長度至關重要,它必須得是有界的,這樣如果執行緒池不堪重負了它可以暫時拒絕掉新的請求。 * ExecutorService 預設的實現是一個無界的 LinkedBlockingQueue。 */
private static ThreadPoolExecutor executor = new ThreadPoolExecutor(corePoolSize, corePoolSize+1, 10l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1000)); public static void main(String[] args) throws InterruptedException { CountDownLatch latch = new CountDownLatch(5); //使用execute方法 executor.execute(new Stats("任務A", 1000, latch)); executor.execute(new Stats("任務B", 1000, latch)); executor.execute(new Stats("任務C", 1000, latch)); executor.execute(new Stats("任務D", 1000, latch)); executor.execute(new Stats("任務E", 1000, latch)); latch.await();// 等待所有人任務結束 System.out.println("所有的統計任務執行完成:" + sdf.format(new Date())); } static class Stats implements Runnable { String statsName; int runTime; CountDownLatch latch; public Stats(String statsName, int runTime, CountDownLatch latch) { this.statsName = statsName; this.runTime = runTime; this.latch = latch; } public void run() { try { System.out.println(statsName+ " do stats begin at "+ startTime); //模擬任務執行時間 Thread.sleep(runTime); System.out.println(statsName + " do stats complete at "+ sdf.format(new Date())); latch.countDown();//單次任務結束,計數器減一 } catch (InterruptedException e) { e.printStackTrace(); } } } }
結果

方式二(Future)

重點是和springboot整合,採用註解bean方式生成ThreadPoolTaskExecutor

@Bean

//spring依賴包
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
@Configuration
public class GlobalConfig {

    /**
     * 預設執行緒池執行緒池
     *
     * @return Executor
     */
    @Bean
    public ThreadPoolTaskExecutor defaultThreadPool() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //核心執行緒數目
        executor.setCorePoolSize(16);
        //指定最大執行緒數
        executor.setMaxPoolSize(64);
        //佇列中最大的數目
        executor.setQueueCapacity(16);
        //執行緒名稱字首
        executor.setThreadNamePrefix("defaultThreadPool_");
        //rejection-policy:當pool已經達到max size的時候,如何處理新任務
        //CALLER_RUNS:不在新執行緒中執行任務,而是由呼叫者所在的執行緒來執行
        //對拒絕task的處理策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        //執行緒空閒後的最大存活時間
        executor.setKeepAliveSeconds(60);
        //載入
        executor.initialize();
        return executor;
    }
}
使用
//通過註解引入配置
@Resource(name = "defaultThreadPool")
private ThreadPoolTaskExecutor executor;
                        //使用Future方式執行多工
                        //生成一個集合
                        List<Future> futures = new ArrayList<>();

                        //獲取後臺全部有效運營人員的集合
                        List<AdminUserMsgResponse> adminUserDOList = adminManagerService.GetUserToSentMsg(null);

                        for (AdminUserMsgResponse response : adminUserDOList) {
                            //併發處理
                            if (response.getMobile() != null) {
                                Future<?> future = executor.submit(() -> {
                                    //傳送簡訊
                                    mobileMessageFacade.sendCustomerMessage(response.getMobile(), msgConfigById.getContent());
                                });
                                futures.add(future);
                            }
                        }

                      //查詢任務執行的結果
                       for (Future<?> future : futureList) {
                            while (true) {//CPU高速輪詢:每個future都併發輪循,判斷完成狀態然後獲取結果,這一行,是本實現方案的精髓所在。即有10個future在高速輪詢,完成一個future的獲取結果,就關閉一個輪詢
                     if (future.isDone()&& !future.isCancelled()) {//獲取future成功完成狀態,如果想要限制每個任務的超時時間,取消本行的狀態判斷+future.get(1000*1, TimeUnit.MILLISECONDS)+catch超時異常使用即可。
                         Integer i = future.get();//獲取結果
                        System.out.println("任務i="+i+"獲取完成!"+new Date());
                        list.add(i);
                        break;//當前future獲取結果完畢,跳出while
                    } else {
                        Thread.sleep(1);//每次輪詢休息1毫秒(CPU納秒級),避免CPU高速輪循耗空CPU---》新手別忘記這個
                    }
                }
             }