執行緒池之ThreadPoolExecutor
目錄
上一篇說了多執行緒開發和4種常見的執行緒池:https://blog.csdn.net/qq_37321098/article/details/82781885
1.ThreadPoolExecutor建構函式
我們看一下FixedThreadPool的例項化:
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
所以說,4種常見的執行緒池都是對ThreadPoolExecutor配置不同而已。下面主要看看它引數最全的構造方法:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 。。。 }
引數含義如下:
corePoolSize 執行緒池中核心執行緒的數量,即能夠同時執行的任務數量
maximumPoolSize 執行緒池中最大執行緒數量。當我們每新增一個任務,如當執行緒數小於corePoolSize時,則立即開啟執行緒執行;當corePoolSize滿的時候,後面新增的任務將放入緩衝佇列workQueue等待;當workQueue也滿的時候,看是否超過maximumPoolSize執行緒數,如果超過,預設拒絕執行(比如corePoolSize為2且已滿,maximumPoolSize為3,workQueue滿了,此時來2個任務,會開啟1個非核心執行緒去執行1個任務,至於還有1個任務,由於超出maximumPoolSize數量,會被拒絕執行)。
keepAliveTime 非核心執行緒的超時時長,當系統中非核心執行緒閒置時間超過keepAliveTime之後,則會被回收。如果ThreadPoolExecutor的allowCoreThreadTimeOut屬性設定為true,則該引數也表示核心執行緒的超時時長
unit 第三個引數的單位,有納秒、微秒、毫秒、秒、分、時、天等
workQueue 執行緒池中的任務佇列,該佇列主要用來儲存已經被提交但是尚未執行的任務。
threadFactory 為執行緒池提供建立新執行緒的功能,一般使用預設即可
handler 拒絕新增新任務策略,當執行緒無法執行新任務時(一般是由於執行緒池中的執行緒數量已經達到最大數或者執行緒池關閉導致的),預設情況下,當執行緒池無法處理新執行緒時,會丟擲一個RejectedExecutionException。
2.BlockingQueue<Runnable> workQueue引數詳解
BlockingQueue阻塞佇列存放待執行任務,BlockingQueue有多種不同的實現類:
1)ArrayBlockingQueue:規定了大小的BlockingQueue,建構函式的int值可設定大小,任務執行順序為先進先出
2)LinkedBlockingQueue:大小不確定的BlockingQueue,構造方法中可傳一個int值去規定佇列大小,也可以不傳,預設的大小就為Integer.MAX_VALUE
3)PriorityBlockingQueue:和LinkedBlockingQueue類似,不同的是PriorityBlockingQueue中的任務不是先進先出規則,而是按照Comparator來決定存取順序的(因此存入PriorityBlockingQueue中的資料必須實現Comparator介面)。
4)SynchronousQueue:同步的BlockingQueue,屬於執行緒安全的。https://blog.csdn.net/zmx729618/article/details/52980158
3.ThreadPoolExecutor執行任務demo
//建立核心執行緒3,最大執行緒3,超時時長1s,容量100的LinkedBlockingDeque佇列
final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 3,
1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(100));
findViewById(R.id.bt).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
for (int i = 0; i < 10; i++) {
poolExecutor.execute(new Runnable() {
@Override
public void run() {
SystemClock.sleep(1000);
}
});
}
}
});
執行結果如下:
//12s的時候,執行2任務
13:54:12.096 30856-31644/com.bihucj.mcandroid E/任務:: 1
13:54:12.096 30856-31643/com.bihucj.mcandroid E/任務:: 0
//13s的時候,執行2任務
13:54:13.096 30856-31644/com.bihucj.mcandroid E/任務:: 3
13:54:13.097 30856-31643/com.bihucj.mcandroid E/任務:: 2
此時我們改一下ThreadPoolExecutor引數為:
final ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 5,
1, TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>(5));
執行結果如下:
//核心執行緒執行2任務,然後5個任務放到佇列,有剩下的3個非核心執行緒執行順序較後的3任務
13:59:49.471 9064-9337/com.bihucj.mcandroid E/任務:: 0
13:59:49.471 9064-9338/com.bihucj.mcandroid E/任務:: 1
13:59:49.483 9064-9339/com.bihucj.mcandroid E/任務:: 7
13:59:49.484 9064-9340/com.bihucj.mcandroid E/任務:: 8
13:59:49.484 9064-9341/com.bihucj.mcandroid E/任務:: 9
//佇列中任務開始執行
13:59:50.471 9064-9337/com.bihucj.mcandroid E/任務:: 2
13:59:50.472 9064-9338/com.bihucj.mcandroid E/任務:: 3
13:59:50.484 9064-9339/com.bihucj.mcandroid E/任務:: 4
13:59:50.484 9064-9340/com.bihucj.mcandroid E/任務:: 5
13:59:50.485 9064-9341/com.bihucj.mcandroid E/任務:: 6
模仿AsyncTask設定引數:核心執行緒數為手機CPU數量+1,最大執行緒數為手機CPU數量×2+1,執行緒佇列的大小為128
4.利用submit去提交任務
submit和execute一樣擁有提交任務的功能, 不同在於submit的提交擁有返回值。如下:
Future<?> submit(Runnable task);
<T> Future<T> submit(Runnable task, T result);
<T> Future<T> submit(Callable<T> task);
至於Future可以看看這些:
https://www.cnblogs.com/cz123/p/7693064.html
也就是說,我們的任務被轉換成了Future,在非同步執行的同時,我們還能取消這個任務的執行。看看下面程式碼:
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
先看一個submit提交任務的demo:
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
//ThreadPoolExecutor建立
final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(2, 3, 1,
TimeUnit.SECONDS, new LinkedBlockingDeque<Runnable>());
//儲存結果
final List<Future<User>> futures = new ArrayList<>();
findViewById(R.id.bt).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
for (int i = 0; i < 10; i++) {
Future<User> taskFuture = threadPoolExecutor.submit(new MyTask(i));
//將每一個任務的執行結果儲存起來
futures.add(taskFuture);
}
//檢視結果
try {
for (Future<User> future : futures) {
Log.d("姓名為:", future.get().getName());
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
});
}
class User {
private String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
class MyTask implements Callable<User> {
private int taskId;
public MyTask(int taskId) {
this.taskId = taskId;
}
@Override
public User call() throws Exception {
SystemClock.sleep(1000);
//返回每一個任務的執行結果
return new User(taskId + "");
}
}
5.自定義ThreadPoolExecutor
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
final MyThreadPool myThreadPool = new MyThreadPool(3, 5, 1, TimeUnit.MINUTES, new LinkedBlockingDeque<Runnable>());
findViewById(R.id.bt).setOnClickListener(new View.OnClickListener() {
@Override
public void onClick(View v) {
for (int i = 0; i < 10; i++) {
final int finalI = i;
Runnable runnable = new Runnable(){
@Override
public void run() {
SystemClock.sleep(100);
Log.d("MyThreadPool", "run: " + finalI);
}
};
myThreadPool.execute(runnable);
}
}
});
}
class MyThreadPool extends ThreadPoolExecutor{
//有4個構造方法可以選擇
public MyThreadPool(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
Log.d("MyThreadPool", "beforeExecute: 開始執行任務!");
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
Log.d("MyThreadPool", "beforeExecute: 任務執行結束!");
}
@Override
protected void terminated() {
super.terminated();
//當呼叫shutDown()或者shutDownNow()時會觸發該方法
Log.d("MyThreadPool", "terminated: 執行緒池關閉!");
}
}