如何實現一個執行緒排程框架
一、前言
執行緒是程式執行流的最小單元,很基礎,也很重要。
為了提高流暢性,耗時任務放後臺執行緒執行,這是APP開發的常識了。
隨著APP複雜度的提升,越來越多工需要開執行緒執行,同時,遇到如下挑戰:
- 任務場景多樣化,常規的API無法滿足;
- 隨著元件化,模組化等演進,可能使得執行緒管理不統一(比如多個執行緒池)。
為此,我們今天來探討一下的如何設計執行緒排程。
話不多說,從執行緒池開始吧。
二、執行緒池
2.1 ThreadPoolExecutor
為了減少執行緒建立和銷燬帶來的時間和空間上的代價,開發中通常會用到執行緒池。
JDK提供了一個很好用的執行緒池的封裝:ThreadPoolExecutor
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
corePoolSize:核心執行緒大小
maximumPoolSize:執行緒池最大容量(需大於等於corePoolSize,否則會拋異常)
keepAliveTime:執行緒執行任務結束之後的存活時間
unit:時間單位
workQueue:任務佇列
threadFactory:執行緒工廠
handler:拒絕策略
執行緒池中有兩個任務容器:
private final HashSet<Worker> workers = new HashSet<Worker>(); private final BlockingQueue<Runnable> workQueue;
前者用於儲存工作者執行緒,後者用於緩衝任務。
值得一提的是,maximumPoolSize限定的是workers的容量,和workQueue無關。
一個任務到來,假設此時容器workers中的執行緒數為n,則
- 當n < corePoolSize時,建立執行緒來執行這個任務,並將執行緒放入workers;
-
當n >= corePoolSize時,
- 若workQueue未滿,則將任務放入workQueue
-
若workQueue已滿,
- 若n < maximumPoolSize, 建立執行緒來執行這個任務,並將執行緒放入workers;
- 若n >= maximumPoolSize, 執行拒絕策略。
當任務執行結束,執行緒會存活keepAliveTime的時間;
時間到,
如果allowCoreThreadTimeOut為true, 或者 n > corePoolSize, 執行緒銷燬;
否則,執行緒進入等待,直到新的任務到來(或者執行緒池關閉)。
關於workQueue,有兩個極端:
- new SynchronousQueue(): 容量為零,一個任務裝也不進;
- new LinkedBlockingQueue(): 無限容量,多少任務都裝不滿。
2.2 Executors
為了方便使用,JDK還封裝了一些常用的ExecutorService:
public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
}
型別 | 最大併發 | 適用場景 |
---|---|---|
newFixedThreadPool | nThreads | 計算密集型任務 |
newSingleThreadExecutor | 1 | 序列執行的任務 |
newCachedThreadPool | Integer.MAX_VALUE | IO密集型任務 |
newScheduledThreadPool | Integer.MAX_VALUE | 定時任務,週期任務 |
眾多ExecutorService中,newCachedThreadPool() 是比較特別的,
1、corePoolSize = 0,
2、maximumPoolSize = Integer.MAX_VALUE,
3、workQueue 為 SynchronousQueue。
效果是:所有任務立即排程,無容量限制,無併發限制。
這樣的特點比較適合網路請求任務。
OkHttp的非同步請求所用執行緒池與此類似(除了ThreadFactory ,其他引數一模一樣)。
public synchronized ExecutorService executorService() {
if (executorService == null) {
executorService = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), Util.threadFactory("OkHttp Dispatcher", false));
}
return executorService;
}
2.3 執行緒池大小的估算
一臺裝置上,給定一批任務,要想最快時間完成所有任務,併發量應該如何控制?
一些文章提到如下估算公式:
M:併發數;
C:任務佔用CPU的時間;
I:等待IO完成的時間(為簡化討論,且只考慮IO);
N:CPU核心數。
遺憾的是,對於APP來說,這條公式並不適用:
- 任務佔用CPU時間和IO時間無法估算
APP上的非同步任務通常是碎片化的,而不同的任務性質不一樣,有的計算耗時多,有的IO耗時多;
然後同樣是IO任務,比方說網路請求,IO時間也是不可估計的(受伺服器和網速影響)。
- 可用CPU核心可能會變化
有的裝置可能會考慮省電或者熱量控制而關閉一些核心;
大家經常吐槽的“一核有難,九核圍觀”對映的就是這種現象。
雖然該公式不能直接套用來求解最大併發,但仍有一些指導意義:
**IO等待時間較多,則需要高的併發,來達到高的吞吐率;
CPU計算部分較多,則需要降低併發,來提高CPU的利用率。**
換言之,就是:
做計算密集型任務時控制併發小一點;
做IO密集型任務時控制併發大一點。
比如RxJava就提供了Schedulers.computation()和Schedulers.io(),
前者預設情況下為最大併發為CPU核心數,後者最大併發為Integer.MAX_VALUE。
三、執行緒框架
JDK提供執行緒池是比較基礎,通用的API。
APP開發中,大家通常會使用一些為特定場景做對應的封裝框架,比如AsyncTask和RxJava。
AsyncTask的定位是“方便非同步任務和主執行緒互動”的“輕量級執行緒框架”,RxJava 則不僅僅是執行緒框架,其內涵更加豐富。
AsyncTask自誕生之初就被廣泛吐槽,但是對其原始碼分析倒是樂此不彼;
RxJava開始在Android中普及的階段,AsyncTask又被錘了一遍;
到現在很少人提AsyncTask了,零零星星地會被提起。
其實AsyncTask刨去註釋只有三百多行程式碼,而RxJava的jar包有兩M多,猶如單車和汽車,各有各的定位。
我們就不做太多的比較了,這裡主要是提一下,承上啟下的作用。
AsyncTask可能因為其定位的原因,設計有些保守,但總的來說實現簡單,構思精巧,還是有不少地方值得借鑑的。
接下來,我們以AsyncTask為藍本,結合APP開發中的使用場景,探討如何設計一個適用性更強的執行緒框架。
四、執行緒排程
4.1 執行緒複用
第二節中我們分析了執行緒池和幾個ExecutorService,結論是不同的任務特徵,用不同的排程器。
但是,比方說如果直接呼叫 newFixedThreadPool 和 newSingleThreadExecutor 來分別執行任務的話,
會有兩個執行緒池,彼此的任務不能複用執行緒,造成浪費。
對此,AsyncTask給我們提供了一種思路。
先看程式碼:
private static class SerialExecutor implements Executor {
final ArrayDeque<Runnable> mTasks = new ArrayDeque<Runnable>();
Runnable mActive;
public synchronized void execute(final Runnable r) {
mTasks.offer(new Runnable() {
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
});
if (mActive == null) {
scheduleNext();
}
}
protected synchronized void scheduleNext() {
if ((mActive = mTasks.poll()) != null) {
THREAD_POOL_EXECUTOR.execute(mActive);
}
}
}
/**
* An {@link Executor} that can be used to execute tasks in parallel.
*/
public static final Executor THREAD_POOL_EXECUTOR;
static {
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
CORE_POOL_SIZE, MAXIMUM_POOL_SIZE, KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
sPoolWorkQueue, sThreadFactory);
threadPoolExecutor.allowCoreThreadTimeOut(true);
THREAD_POOL_EXECUTOR = threadPoolExecutor;
}
/**
* An {@link Executor} that executes tasks one at a time in serial
* order. This serialization is global to a particular process.
*/
public static final Executor SERIAL_EXECUTOR = new SerialExecutor();
先定義一個執行緒池THREAD_POOL_EXECUTOR,並行任務可以呼叫此Executor來執行;
封裝一個SerialExecutor,加了一個任務佇列,控制加入的任務序列執行,但是最終還是執行在THREAD_POOL_EXECUTOR上。
於是,呼叫者可以選擇序列或者並行,且是在同一個執行緒池中排程的,執行緒可以複用。
這裡摳一下細節:
1、原始碼中THREAD_POOL_EXECUTOR的註釋,“execute tasks in parallel”。
parallel, 並行;Concurrent,併發。
個人認為此處應為“併發”,參考:併發與並行的區別。
2、SERIAL_EXECUTOR的註釋:“This serialization is global to a particular process.”。
這裡沒有什麼錯誤,但是要注意一個詞,global。
global, 意味著不同的任務公用一個序列佇列,可能會彼此阻塞。
在3.0之後, AsyncTask預設排程器是這個SERIAL_EXECUTOR。
關於這個設定,印象最深的是這位開發者遇到的“坑”:《使用AsyncTask時需要注意的隱含bug》
簡單地說,就是他同時用了兩個SDK,一個用來做圖片剪裁,一個是facebook的廣告SDK。
然後發現圖片載入不出來,經過核查發現兩個SDK都用了AsyncTask, 但是都是用的序列的Executor。
國內訪問外網速度偏慢,所以facebook的SDK阻塞了後面的任務(圖片剪裁)。
後來作者給這個圖片剪裁庫的開發者提了issue:Android-Image-Cropper, issues 183
關於這個問題,簡單的解決方法是不同的任務用不同的SerialExecutor,共用執行緒池,但各自序列執行,互不干擾。
後面我們會介紹其他方案,接下來先繼續分析Executor。
4.2 封裝Executor
4.2.1 任務分組
上面我們看到,AsyncTask通過Executor包裝Executor, 建立了SerialExecutor,增加了序列執行的能力。
這種技巧我們在JDK的InputStream和OutputStream也領略過了,大家稱之為“裝飾者模式”。
雖然拓展序列執行的能力,但是還是不支援分組併發。
為此,我們仿照SerialExecutor封裝一個Executor:
open class PipeExecutor @JvmOverloads constructor(
windowSize: Int,
private val capacity: Int = -1,
private val rejectedHandler: RejectedExecutionHandler = defaultHandler) : TaskExecutor{
private val tasks = PriorityQueue<PriorityRunnable>()
private val windowSize: Int = if (windowSize > 0) windowSize else 1
private var count = 0
companion object {
val defaultHandler = ThreadPoolExecutor.AbortPolicy()
}
@Synchronized
override fun execute(r: Runnable, tag: String, priority: Int, finish: (tag: String) -> Unit) {
if(capacity > 0 && count + tasks.size() >= capacity){
rejectedHandler.rejectedExecution(r, TaskCenter.executor)
}
val active = PriorityRunnable(r, tag, finish)
if (count < windowSize || priority == Priority.IMMEDIATE) {
startTask(active)
} else {
tasks.offer(active, priority)
}
}
override fun execute(r: Runnable) {
execute(r, "")
}
// ......
private fun startTask(active: Runnable?) {
if (active != null) {
count++
// 執行緒池封裝在 TaskCenter 中,任務最終在該執行緒池中執行
TaskCenter.poolExecutor.execute(active)
}
}
}
class PriorityRunnable internal constructor(
private val r: Runnable,
private val tag: String,
private val finish: (tag: String) -> Unit) : Runnable {
override fun run() {
try {
r.run()
} finally {
scheduleNext()
if(!tag.isEmpty()){
finish(tag)
}
}
}
// ......
}
解析一下程式碼中的引數:
windowSize:控制Executor的併發;
capacity:Executor容量,-1時為不限容量,超過容量觸發rejectedHandler;
rejectedHandler:預設為AbortPolicy(丟擲異常);
priority:排程優先順序,當任務數量超過windowSize時,priority高者先被排程;
tag:任務標識;
finish: 任務結束後觸發此回撥,搭配tag完成一項功能(接下來會有介紹)。
使用時,可以例項化多個PipeExecutor,他們各自根據引數排程自己的任務佇列,但最終都是在同一個執行緒池中執行。
比方說可以建立windowSize設定為cpu數量的PipeExecutor,用於計算密集型任務;
也可以建立windowSize多一點的PipeExecutor,用於IO密集型任務;
還可以使windowSize=1,用於序列執行。
PipeExecutor支援優先順序,當優先順序設定為IMMEDIATE為立即執行。
優先順序相同的任務,遵循先進先出(FIFO)的排程規則。
4.2.2 任務去重
APP開發中常會遇到任務重複的情況。
比方說一個頁面所展示的資料可能來自多個數據源,而每個資料來源的變更入口有多個,當同時有幾個資料變更時,如果不做去重,會浪費計算資源,甚至使得APP卡頓;
又如,有幾個資料項所記錄的是同一張圖片,需要上傳,然後更新路徑為服務端回傳的URL,如果資料上傳是併發的,會導致圖片重複上傳。
說到去重,首先要定義重複;
要定義重複,就要給任務設定標識,相同標識視為重複。
所以TaskExecutor給到的execute方法可以傳tag引數,用tag標識一類任務。
不同的任務型別,去重策略也不一樣。
1、資料重新整理任務
當重新整理任務在執行時,忽略後面的任務。不妥。忽略後面的任務,可能造成頁面沒有正確更新。
有任務正在執行,取消之,新建任務。也不妥。取消前面的任務,極端情況下(比如間隔性持續有重新整理通知到達),可能會造成頁面遲遲得不到更新。
這類任務的特徵是,當任務未開始,一個和多個是等價的,故此對應的策略為:當有任務在執行時,保留一個任務在佇列,忽略後來者。
其示意圖如下:
其特徵為:
不相同tag的任務併發,想同tag的任務序列;
但是tag相同的任務,最多隻能存2個,更多的後來者將會被忽略。
進入排程的任務也不一定會被馬上執行,只是被放到PipeExecutor中,進行下一層的排程。
2、圖片載入任務
圖片載入任務通常用圖片的路徑作為tag。
但圖片載入除了path之外,還有target(要載入到哪個ImageView)。
所以不能採用上面的“忽略後來者”的策略,否則有可能導致有的ImageView載入不出圖片(多個ImageView需要載入同一張圖片的情況)。
把target混入tag?不行。有可能導致重複下載或者重複解碼。
而如果讓path相同的載入任務序列,則可以複用快取。
從這個角度看,也是一種“去重”。
對應示意圖如下:
其排程模式和前面的“資料重新整理任務”很像,只是沒有"ignore"。
從另一個角度看,這種模式可以用於執行“序列的任務”,只需要給同類的任務加tag即可。
這樣的話就不用到處建立windowSize=1的PipeExecutor了。
任務去重的實現如下:
class LaneExecutor(private val executor: PipeExecutor, private val limit: Boolean = false) : TaskExecutor {
private val scheduledTasks = HashMap<String, Runnable>()
private val waitingQueues by lazy { HashMap<String, CircularQueue<TaskWrapper>>() }
private val waitingTasks by lazy { HashMap<String, TaskWrapper>() }
private class TaskWrapper(val r: Runnable, val priority: Int)
private val finishCallback: (tag: String) -> Unit = { tag ->
scheduleNext(tag)
}
@Synchronized
override fun scheduleNext(tag: String) {
scheduledTasks.remove(tag)
if (limit) {
waitingTasks.remove(tag)?.let { start(it.r, tag, it.priority) }
} else {
waitingQueues[tag]?.let {
val wrapper = it.poll()
if (wrapper == null) {
waitingQueues.remove(tag)
} else {
start(wrapper.r, tag, wrapper.priority)
}
}
}
}
@Synchronized
override fun execute(r: Runnable, tag: String, priority: Int, finish: (tag: String) -> Unit) {
if (scheduledTasks.containsKey(tag)) {
if (limit) {
if (waitingTasks.containsKey(tag)) {
if (r is Future<*>) {
r.cancel(false)
}
} else {
waitingTasks[tag] = TaskWrapper(r, priority)
}
} else {
val queue = waitingQueues[tag]
?: CircularQueue<TaskWrapper>().apply { waitingQueues[tag] = this }
queue.offer(TaskWrapper(r, priority))
}
} else {
start(r, tag, priority)
}
}
private fun start(r: Runnable, tag: String, priority: Int) {
scheduledTasks[tag] = r
executor.execute(r, tag, priority, finishCallback)
}
}
PipeExecutor和LaneExecutor的關係如下圖:
之前PipeExecutor通過裝飾者模式,在ThreadPoolExecutor之上包裝了一層,拓展了分組,優先順序等特性,
如今LaneExecutor在PipeExecutor上又包了一層,拓展了去重的特性。
關於組合和繼承,普遍的觀點是組合優先於繼承。
所以在設計LaneExecutor時,用PipeExecutor作為成員而非繼承於PipeExecutor。
4.3 全域性排程
當專案複雜度到了一定程度,如果沒有相對嚴格的規範約束的話,可能會看到各種各樣的冗餘物件,比如快取和Executor。
因為不想被其他模組所幹擾,或者圖方便,開發者可能會在自己的模組定義自己的Executor。
分散的Executor有隔離的效果(不會相互影響),但副作用就是無法集中管控,過多的執行緒併發執行可能會導致資源爭搶以及帶來更多執行緒切換代價。
如果各自建立的原生JDK提供的執行緒池,則還要加上一條:降低執行緒複用。
故此,可以集中定義執行器,各模組統一呼叫。
object TaskCenter {
private val cpuCount = Runtime.getRuntime().availableProcessors()
// ......
// standard Executor
val io = PipeExecutor(16, 512)
val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 6), 256)
// use to execute tasks which need to run in serial,
// such as writing logs, reporting app info to server ...
val lane = LaneExecutor(PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512))
// use to execute general tasks,such as loading data.
val laneIO = LaneExecutor(io, true)
val laneCP = LaneExecutor(computation, true)
}
很多開源專案都設計了API來使用外部的Executor,例如RxJava的話可以這樣使用:
object TaskSchedulers {
val io: Scheduler by lazy { Schedulers.from(TaskCenter.io) }
val computation: Scheduler by lazy { Schedulers.from(TaskCenter.computation) }
val single by lazy { Schedulers.from(PipeExecutor(1)) }
}
使用:
Observable.range(1, 8)
.subscribeOn(TaskSchedulers.computation)
.subscribe { Log.d(tag, "number:$it") }
五、拓展AsyncTask
通過上面構造的相對完善的Executor,我們可以用於擴充套件AsyncTask。
通過繼承AsyncTask無法做到我們預想的效果,所以沒辦法,只能重新寫一個了。
限於篇幅,這裡就不分析具體實現了。
大體框架還是Executor + Handler, 只是Executor換上了TaskExecutor,以及新增生命週期(被錘得最多的缺點之一)的支援。
簡單地說,就是通過觀察者模式實現對宿主的生命週期(onDestroy, onPause, onResume)的監聽,在onDestroy是取消任務,在onPause時降低優先順序,在onResume時恢復優先順序。
這裡補充一點,關於AsyncTask的cancel, 有不少文章說不一定能立即取消任務。
確實是不一定能立即取消,但這其實是合理的。
當呼叫AsyncTask的cancel(mayInterruptIfRunning), 並傳入true時,會觸發interrupt()。
關於interrupt()知乎上有不錯的討論:Java裡一個執行緒呼叫了Thread.interrupt()到底意味著什麼。
interrupt() 雖然不能保證馬上終止任務,但是能夠中斷sleep(), wait()等方法;
如果使用OkHttp, interrupt()能夠中斷網路請求。
為什麼不用Thread.stop()呢? Thread.stop()是個危險的方法。
比方說一個執行緒正在寫入資料,如果突然中止,可能資料就不對了;
更有甚者,可能導致檔案不完整,可能導致檔案的資料都丟失了。
拓展後用法和原生的AsyncTask用法是類似的,只是多了一些方法,以提供額外的功能,例如優先順序,以及監聽Activity/Fragment生命週期。
六、下載
implementation 'com.horizon.task:task:1.0.3'
相關程式碼已上傳GitHub,
專案地址:https://github.com/No89757/Task