1. 程式人生 > >ExecutorService執行緒池詳解

ExecutorService執行緒池詳解

  • 對執行緒池 “池”的理解
    可以理解為工廠,如果工廠生產斧頭,小王從工廠借了一把,當小王用完了,還給工廠,之後小李也可以借去用
    複用已有資源
    控制資源總量

你一個任務過來了,我發現池子裡有沒事幹並且還活著的執行緒,來,拿去用,我也不用費事給你建立一條執行緒了,要知道執行緒的建立和銷燬可都是麻煩事
你一個任務過來了,我發現池子的執行緒都在忙,並且現在池子的執行緒已經太多了,在不限制下去就要記憶體溢位了,來,排隊去

執行緒池的作用

1、new Thread的弊端

new Thread(new Runnable() {
    @Override
    public
void run() { System.out.println("--------"); } }).start();

a. 每次new Thread新建物件效能差:每次都要經歷新建,就緒,執行,阻塞,死亡
b. 執行緒缺乏統一管理,可能無限制新建執行緒,相互之間競爭,及可能佔用過多系統資源導致宕機或oom。
c. 缺乏更多功能,如定時執行、定期執行、執行緒中斷。

2、相比new Thread,Java提供的四種執行緒池的好處在於:
a. 重用存在的執行緒,減少物件建立、消亡的開銷,效能佳。
b. 可有效控制最大併發執行緒數,提高系統資源的使用率,同時避免過多資源競爭,避免堵塞。
c. 提供定時執行、定期執行、單執行緒、併發數控制等功能。

Executors介紹

通過Executors可以獲得四種執行緒池
Executors
ExecutorService service = Executors.newCachedThreadPool();
//建立一個可快取執行緒池,如果執行緒池長度超過處理需要,可靈活回收空閒執行緒,若無可回收,則新建執行緒。池中物任務時,超過60秒,會自動關閉執行緒池。

ExecutorService service = Executors.newSingleThreadExecutor();
// 單執行緒化的執行緒池,它只會用唯一的工作執行緒來執行任務,保證所有任務按照指定順序(FIFO, LIFO, 優先順序)執行。(測試時都是先進先出的規則)

ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
//建立一個定長執行緒池,支援定時及週期性任務執行。ScheduledExecutorService為ExecutorService的子類

ExecutorService service = Executors.newFixedThreadPool(15);
//建立一個定長執行緒池,可控制執行緒最大併發數,超出的執行緒會在佇列中等待。

例子

  • 執行緒TestThread
public class TestThread extends Thread{

    @Override
    public void run() {
        try {
            Thread.sleep(1000);

        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(System.currentTimeMillis()+"===="+ getPriority()+"----資料庫操作---"+this.getName());
    }
}
  • 快取執行緒池Executors.newCachedThreadPool();
ExecutorService service = Executors.newCachedThreadPool();
for (int i = 0; i < 10; i++) {
    Thread td = new TestThread();
    service.execute(td);
}
  • 單例執行緒池Executors.newSingleThreadExecutor();
ExecutorService service = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
    Thread td = new TestThread();
    service.execute(td);
}
// 列印結果
// 1523331778779====5----資料庫操作---Thread-0
// 1523331779780====5----資料庫操作---Thread-1
// 1523331780780====5----資料庫操作---Thread-2
// 1523331781780====5----資料庫操作---Thread-3
// 1523331782780====5----資料庫操作---Thread-4
// 1523331783780====5----資料庫操作---Thread-5
// 1523331784780====5----資料庫操作---Thread-6
// 1523331785780====5----資料庫操作---Thread-7
// 1523331786781====5----資料庫操作---Thread-8
// 1523331787781====5----資料庫操作---Thread-9
  • 定時執行緒池Executors.newScheduledThreadPool(10);
ScheduledExecutorService service = Executors.newScheduledThreadPool(10);
for (int i = 0; i < 10; i++) {
    Thread td = new TestThread();
    service.schedule(td, 5, TimeUnit.SECONDS); // 新增任務5秒之後執行
}
  • 定長執行緒池Executors.newFixedThreadPool(15);
ExecutorService service = Executors.newFixedThreadPool(15);
for (int i = 0; i < 10; i++) {
    Thread td = new TestThread();
    service.execute(td);
}

專案中的用法

package com.hesvit.common.task.execute;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * <p>
 * 執行緒池建立
 * <p>
 * @author liuping
 * @since 2017年12月26日
 */
public class TaskExecute {
    // 建立一個緩衝池,緩衝池容量大小為Integer.MAX_VALUE
    private static ExecutorService fixedThreadPool; 
    // 建立固定容量大小的緩衝池
    private static ExecutorService singleThreadExecutor;
    // 建立容量為1的緩衝池
    private static ExecutorService cachedThreadPool; 

    public static ExecutorService newFixedThreadPool(int nThreads) {
        if (fixedThreadPool == null)
            return Executors.newFixedThreadPool(nThreads);
        else
            return fixedThreadPool;
    }

    public static ExecutorService newSingleThreadExecutor() {
        if (singleThreadExecutor == null)
            return Executors.newSingleThreadExecutor();
        else
            return singleThreadExecutor;
    }

    public static ExecutorService newCachedThreadPool() {
        if (cachedThreadPool == null)
            return Executors.newCachedThreadPool();
        else
            return cachedThreadPool;
    }

    public static void execute(Runnable task) {
        newFixedThreadPool(6).execute(task);
    }
}

TaskExecute.execute(new RefreshAppUserThread());// 重新整理app使用者資訊

// 執行緒
public class RefreshAppUserThread extends Thread {

    @Override
    public void run() {
        RedisUtil.refreshAppUser();
    }

}


import redis.clients.jedis.Jedis;
@Component
public class RedisUtil {

    /** 過時時間為2小時:2 * 60 *60 */
    private static int EXPIRE_2_HOUR = 7200;
    // 快取所有App使用者
    private static String ALL_APP_USERS = "all_app_users";


    public static List<User> refreshAppUser() {
        return refreshAppUser(null);
    }

    /**
     * 重新整理App使用者列表
     * liuping
     */
    public static List<User> refreshAppUser(Jedis redis) {
        if (redis == null)
            redis = MyJedisPool.getJedisObject();// 獲得jedis例項
        List<User> users = new ArrayList<>();
        try {
            User user = new User();
            user.setLimit(-1);
            WebApplicationContext context = ContextLoader.getCurrentWebApplicationContext();
            UserService service = (UserService) context.getBean("userService");

            users = service.selectByIndex(user);

            if (null != users) {
                JSONArray jsonArray = JSONArray.fromObject(users);
                String struser = String.valueOf(jsonArray);
                redis.set(ALL_APP_USERS, struser);
                redis.expire(ALL_APP_USERS, EXPIRE_2_HOUR); // 設定過期時間
            }
        } catch (Exception e) {
            logger.error("Failed to get wxuser from redis: {}", e.getMessage());
        } finally {
            MyJedisPool.recycleJedisOjbect(redis);
        }
        return users;
    }

}