(三)改進篇:AsyncTask加強版
一、前言
前文連結:深入解讀AsyncTask
上一篇文章我們介紹了AsyncTask的相關知識點,並就其存在的問題做了深入的探討。
AsyncTask總的來說實現簡單,構思精巧,還是有不少地方值得借鑑的;
但因其保守的設計,在通用性方面有較大侷限。
不過,有侷限,才有突破。
接下來,我們將結合APP開發中的使用場景,探討如何設計一個更強的非同步任務框架。
二、任務排程
前文提到,AsyncTask主要做了兩項工作:流程控制和任務排程。
其中任務排程方面主要依賴於兩個Executor:ThreadPoolExecutor 和 SerialExecutor 。
對於ThreadPoolExecutor而言,如果workQueue是容量較大的的佇列,則基本上coreSize就是併發視窗(可以併發執行的執行緒數量)。
coreSize太小,CPU利用率不高,吞吐率低,不適用於IO密集型任務(比如網路請求);
coreSize太大,如果執行的是計算密集型任務,執行緒切換頻繁,CPU計算飽和(影響UI執行緒運算)。
所以,我們先從任務排程入手,解決“利用率/吞吐率”兩難的問題。
同時探索一下在任務排程方面,還有什麼可以完善的。
2.1 併發控制
如果做到既支援CPU的高利用率,又支援任務的高吞吐率呢?
用兩個執行緒池嗎?
倒是可以,但兩個執行緒池會各自維護執行緒,彼此不能複用。
說到複用,AsyncTask給我們提供了一種思路:
先定義一個執行緒池 THREAD_POOL_EXECUTOR ,並行任務可以呼叫此Executor來執行;
封裝SerialExecutor,加了一個任務佇列,控制加入的任務序列執行,但是最終任務還是執行在 THREAD_POOL_EXECUTOR 上。
於是,呼叫者可以選擇序列或者並行,且是在同一個執行緒池中排程的,執行緒可以複用。
就此思路,我們可以仿照SerialExecutor,線上程池ThreadPoolExecutor之上,封裝一個Executor來控制併發,同時建立不同併發視窗的Executor。
簡單地說,就是給水池接不同的水管,不同的水管口徑可以不一樣。
在造管道之前,我們先準備一些其他工具。
前面的SerialExecutor的程式碼中,有兩個重要的構成部分:
1、Runnable的包裝器(雖然是匿名的);
2、下一個任務的觸發器(雖然只是個方法)。
我們把這兩部分都抽象出來:
interface Trigger { fun next() } class RunnableWrapper constructor( private val r: Runnable, private val trigger: Trigger) : Runnable { override fun run() { try { r.run() } finally { trigger.next() } } }
讀者可能會有疑問:抽個RunnableWrapper 也就算了,一個方法也要抽出來?要不要這麼形式化啊?
Take it easy, 下一節我們會看到,抽出來是有原因的。
現在我們先看下如何建造這條“管道”,既然比作“管道”,且命名為PipeExecutor吧。
class PipeExecutor @JvmOverloads constructor( windowSize: Int, private val capacity: Int = -1, private val rejectedHandler: RejectedExecutionHandler = defaultHandler) : TaskExecutor { private val tasks = PriorityQueue<RunnableWrapper>() private val windowSize: Int = if (windowSize > 0) windowSize else 1 private var count = 0 private val trigger : Trigger = object : Trigger { override fun next() { scheduleNext() } } override fun execute(r: Runnable) { schedule(RunnableWrapper(r, trigger), Priority.NORMAL) } fun execute(r: Runnable, priority: Int) { schedule(RunnableWrapper(r, trigger), priority) } @Synchronized internal fun scheduleNext() { count-- if (count < windowSize) { startTask(tasks.poll()) } } @Synchronized internal fun schedule(r: RunnableWrapper, priority: Int) { if (capacity > 0 && tasks.size() >= capacity) { rejectedHandler.rejectedExecution(r, TaskCenter.poolExecutor) } if (count < windowSize || priority == Priority.IMMEDIATE) { startTask(r) } else { tasks.offer(r, priority) } } private fun startTask(active: Runnable?) { if (active != null) { count++ // poolExecutor 是 ThreadPoolExecutor TaskCenter.poolExecutor.execute(active) } } }
解析一下程式碼中的引數和變數:
- tasks :任務緩衝區
- count :正在執行的任務的數量
- windowSize :併發視窗,控制Executor的併發
- capacity :任務緩衝區容量,小於等於0時為不限容量,超過容量觸發rejectedHandler
- rejectedHandler :預設為AbortPolicy(丟擲異常)
- priority :排程優先順序
object Priority { const val IMMEDIATE = Integer.MAX_VALUE const val HIGH = 1 const val NORMAL = 0 const val LOW = -1 }
當count>=windowSize時,priority高者先被排程;
優先順序相同的任務,遵循先進先出(FIFO)的排程規則;
priority=IMMEDIATE會跳過緩衝區直接進入執行緒池。
需要注意的是,排程優先順序不同於執行緒優先順序,執行緒優先順序更底層一些。
比如AsyncTask的doInBackground()中就呼叫了:
Process.setThreadPriority(Process.THREAD_PRIORITY_BACKGROUND)
這可以使得後臺執行緒的執行緒優先順序低於UI執行緒。
以下是PipeExecutor的流程圖:

使用時,可以例項化多個PipeExecutor,他們各自根據引數排程自己的任務佇列,而所有任務最終都是在同一個執行緒池中執行。
比方說可以建立windowSize小一點的PipeExecutor,用於計算密集型任務;
也可以建立windowSize大一點的PipeExecutor,用於IO密集型任務;
還可以使windowSize=1,用於序列執行。
事實上,如果poolExecutor的配置像這樣的話:
new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
PipeExecutor可以實現SingleThreadPool、FixedThreadPool 和 CachedThreadPool 一樣的功能;
而且所有PipeExecutor的例項可以複用執行緒池的執行緒。
最後,有一個疑問:
如果一個水管的流速剛好能處理的話,兩個水管同時進水豈不是滿得更快?
事實上,終端的任務通常是分散的,比方說觸發某個事件(比如開啟頁面),然後做一波任務。
所以兩個水管同時滿負載的情況應該是比較少的。
有一個場景可能會有大量任務:APP啟動的時候。
此時可以根據任務的輕重緩急合理安排,能序列的序列,能推遲的推遲,儘量避免“洪峰”。
2.2 任務去重
這一節可能相對難理解一點,不過我會想辦法儘量表述清楚一些的。
“任務去重”估計讀者沒(怎麼)聽過,就當是筆者提的一個新概念吧,如有雷同,純屬巧合。
要說去重,首先要定義重複:
做任務其實就是做計算,其模式可以表示為 R=f (X) ;
如果任務A和任務B都是計算 R=f (X) , 且 X 相等,那麼我們說A和B是“重複任務”。
兩個任務的計算是等價的,我們可以通過“去重”來節約運算。
什麼時候會出現“重複任務”呢?
舉兩個例子:
一、資料更新,重新整理頁面
假設一個頁面的資料項(R)有多個數據源(X=[x1,x2…xn]), 即R=f(x1,x2…xn);
任何一個數據源更新, R都要重新計算。
通常“資料更新->計算資料->重新整理介面” 會採用“釋出訂閱模式”:
資料來源更新發送事件,接收到事件,啟動任務計算R。
假如x1,x2…xn短時間內相繼更新了資料,傳送了事件e1,e2...en,然後會幾乎同時啟動N個任務計算R。
- 如果任務並行,N個執行緒併發計算R=f(x1,x2…xn),浪費計算資源,更不用說多執行緒問題的複雜性了;
所以這樣的任務最好不要併發執行。 - 如果任務序列,若R的計算比價耗時,第一個任務還沒計算完,又來了n-1個任務。
其實所有的任務都是計算R=f(x1,x2…xn),對於後面的n-1個任務而言,是等價的。
因為在第一個任務還在計算的時候,x1,x2…xn都更新完了,沒有再更新了,
所以對於後面的任務X是一樣的,當然 R=f(X) 也是一樣的了,這就是“重複任務”。
當然,對應的方法就是: 對於後面的任務,只保留一個就好了 。
除了這個方案,我們來看下其他方案:
- 丟棄後面所有的任務。
如果第一個任務在讀取資料X之後x2...xn才更新,那第一個任務的(x1,x2…xn)和後面的任務就不一樣;
如果直接丟棄後面的任務,很顯然,R得不到正確更新。 - 取消之前的任務,直接計算後面的任務。
首先,即使任務可以取消(馬上停止),x2取消第一個任務,x3取消第二個任務……則介面遲遲得不到更新。
最重要的是,任務不是說取消就取消的。
上一篇有討論,Thread.stop()不安全, interrupt()只能中斷wait(), sleep()等,對計算型任務是中斷不了的。 - 過濾事件, 接收到了其中一個事件, 則一定時間內對e1,e2...en都不響應。
這個可操作性更差,而且結果和丟棄後面所有的任務一樣,R得不到正確更新。
其實不僅有多個數據的情況需要去重,即使是單個數據源,也是一樣的,只是舉例多個數據源能更好地體現。
二、圖片載入
這個例子比上一個簡單很多:多個ImageView需要載入同一張圖片。
這其實也出現了“重複任務”,因為是同一張圖片,所以 R=f(X) 等價。
和那個例子不同的是,這裡有多個目標(Target)。
如果說前面的例子是 R->T 的話,這裡則是 {R->T1, R->T2 ... R->Tn} 。
去重策略有所相同,也有所不同。
相同之處是:序列;
不同之處是:不能丟棄任務(不然有的ImageView得不到更新)。
- 序列的話不會慢嗎?
不會,因為圖片載入通常都有快取策略,第一個任務解碼圖片之後,放入快取,後面的任務讀取快取即可。 - 如果是不同的圖片呢?
嗯,關鍵就在這: 相同的任務序列,不同的任務並行 。 - 怎麼看任務是不是“相同的任務”?
這就是需要給任務一個“標籤(tag)”, tag相同則為相同的任務。
比方說下載任務的話可以將url作為tag。
任務去重的實現如下:
class LaneExecutor(private val executor: PipeExecutor, private val discard: Boolean = false) : TaskExecutor { // 正在排程的任務 private val scheduledTasks = HashMap<String, Runnable>() // 丟棄模式下等待的任務 private val waitingTasks by lazy { HashMap<String, TaskWrapper>() } // 非丟棄模式下等待的任務 private val waitingQueues by lazy { HashMap<String, CircularQueue<TaskWrapper>>() } private class TaskWrapper(val r: Runnable, val priority: Int) private inner class LaneTrigger(val tag : String) : Trigger{ override fun next() { executor.scheduleNext() scheduleNext(tag) } } private fun start(r: Runnable, tag: String, priority: Int) { scheduledTasks[tag] = r executor.schedule(RunnableWrapper(r, LaneTrigger(tag)), priority) } @Synchronized fun scheduleNext(tag: String) { scheduledTasks.remove(tag) if (discard) { waitingTasks.remove(tag)?.let { start(it.r, tag, it.priority) } } else { waitingQueues[tag]?.let { queue -> val wrapper = queue.poll() if (wrapper == null) { // 如果佇列清空了,則順便把佇列從HashMap移除,不然HashMap只增不減 waitingQueues.remove(tag) } else { start(wrapper.r, tag, wrapper.priority) } } } } @Synchronized fun execute(tag: String, r: Runnable, priority: Int = Priority.NORMAL) { if (tag.isEmpty()) { executor.execute(r, priority) } else if (!scheduledTasks.containsKey(tag)) { start(r, tag, priority) } else if (discard) { if (waitingTasks.containsKey(tag)) { // 丟棄模式下,如果有相同的任務在等待,則丟棄傳進來的任務 // 而如果傳進來的又是 Futures(實現了Runnable), 則順便呼叫其cancel()方法 if (r is Future<*>) { r.cancel(false) } } else { waitingTasks[tag] = TaskWrapper(r, priority) } } else { // 非丟棄模式下,每個tag都有一個無界佇列可以快取任務 val queue = waitingQueues[tag] ?: CircularQueue<TaskWrapper>().apply { waitingQueues[tag] = this } queue.offer(TaskWrapper(r, priority)) } } }
建構函式有兩個引數:
executor:PipeExecutor的例項。
discard:當discard=true時,只留一個等待的任務; 否則, 不丟棄任務 。
LaneExecutor的實現和PipeExecutor有些相似的,兩者都有緩衝任務的容器,而執行流程上,
大體都是走了 “execute -> start -> 轉發給另一個Executor -> 執行結束 -> scheduleNext ” 這樣一個流程。
區別在於,LaneExecutor由於要“標識任務”,所以有一個 tag 引數貫穿整個流程,連容器都是以tag為key的HashMap。
還有就是, LaneTrigger 實現了 Trigger , 把tag封裝到類中, 因為這裡的next需要用到tag。
如果沒有封裝Trigger,就需要tag傳給RunnableWrapper,這樣就汙染RunnableWrapper了。
RunnableWrapper同時也被PipeExecutor依賴,而PipeExecutor又不需要tag。
因此,雖然只有一個方法,也封裝到介面中。
關於組合和繼承,普遍的觀點是組合優先於繼承。
所以在設計LaneExecutor時,用PipeExecutor作為成員而非繼承於PipeExecutor。

LaneExecutor自己實現任務去重,然後將任務轉發給PipeExecutor。
他們的關係示意圖如下( 分兩種模式,分別對應前面提到的兩種場景):

discard=true

discard=false
洋蔥似地一層包一層,顯然也是裝飾者模式。
這樣的實現估計大家在用各種 InputStream 和 OutputStream 時已經領略了。
總體上看,
LaneExecutor負責任務去重;
PipeExecutor負責任務併發控制和排程優先順序;
ThreadPoolExecutor負責分配執行緒來執行任務。
還有就是,為什麼命名為LaneExecutor呢?
Lane有車道的意思(泳道也是這個詞),看示意圖,是不是有點像車道?
總結一下LaneExecutor的特點:
- 相同的任務序列,不同的任務並行
- discard=true, 序列的任務,各自(按tag分組)最多隻能有一個任務等待,再有提交會被丟棄;
- discard=false, 每個tag分組都有一個無界佇列緩衝,不會丟棄任務。
從另一個角度看,這種discard=false的模式,
給同類的任務加tag,就可以使得同類任務“序列執行”,相當於 PipeExecutor(1)。
2.3 統一定義Executor
當專案複雜度到了一定程度,如果沒有明確的公共定義,可能會出現各種冗餘物件,比如快取和Executor。
分散的Executor無法較好地控制併發;
如果各自建立的是ThreadPoolExecutor,則還要加上一條:降低執行緒複用。
故此,可以集中定義Executor,各模組統一呼叫。
程式碼如下:
object TaskCenter { private val cpuCount = Runtime.getRuntime().availableProcessors() // ... 定義threadFactory, 程式碼省略 internal val poolExecutor: ThreadPoolExecutor = ThreadPoolExecutor( 0, 256, 60L, TimeUnit.SECONDS, SynchronousQueue(), threadFactory) // 常規的任務排程器,可控制任務併發,支援任務優先順序 val io = PipeExecutor(20, 512) val computation = PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 512) // 帶去重策略的 Executor,可用於資料重新整理等任務 val laneIO = LaneExecutor(io, true) val laneCP = LaneExecutor(computation, true) // 相同的tag的任務會被序列執行,相當於序列的Executor // 可用於寫日誌,上報app資訊等任務 val serial = LaneExecutor(PipeExecutor(Math.min(Math.max(2, cpuCount), 4), 1024)) }
以上設定主要參考AsyncTask的執行緒池引數,以及阿里開發手冊“併發處理”的一些說明:

簡單地說就是不允許maximumPoolSize=Integer.MAX_VALUE,也不允許佇列容量為Integer.MAX_VALUE。
所以TaskCenter的poolExecutor的maximumPoolSize設定為256,PipeExecutor的任務緩衝容量也設定了有限的值。
- TaskCenter中的PipeExecutor的windowSize都不大,
所以如果不在TaskCenter外建立windowSize太大的Executor,用Priority.IMMEDIATE的情況也比較少的話,不會導致執行緒過多。 - 任務緩衝容量capacity=512通常情況下也足夠了,但是執行中真的出了意外,比如某個迴圈建立了超量的任務,
則會執行RejectedExecutionHandler,而預設defaultHandler是AbortPolicy,也就是丟擲RejectedExecutionException。
事實上如果真的在執行中發生了意料之外的情況,確實應該拋RuntimeException(RejectedExecutionException就是其中一種)。
如果預見到有可能會建立很多工,可以在TaskCenter之外自定義capacity大一點的PipeExecutor。
poolExecutor的佇列用的是SynchronousQueue,SynchronousQueue是阻塞佇列,其特點是:
如果沒有執行緒在等著取(poll/take)佇列的元素, 則offer返回false(裝不進);
如果有執行緒在等,則可以放進去,而一放進去就會被執行緒取走。
- 執行緒池中用SynchronousQueue,效果為任務放入即被執行,這樣的話相當於不控制任務併發了;
所以我們用PipeExecutor來控制併發,poolExecutor負責只用負責維護執行緒就好了。 - poolExecutor不需要控制併發的話,coreSize也設定為0就好了,
這樣的話,一段時間(60s)沒有任務執行,執行緒可以銷燬。
為什麼不留一些執行緒存活?AsyncTask不也coreSize>0麼?
AsyncTask佇列為LinkedBlockingQueue,所以coreSize主要作用是控制併發;
其threadPoolExecutor呼叫了allowCoreThreadTimeOut(true),沒有任務執行的話也是會銷燬執行緒的。
三、拓展AsyncTask
上一章我們花了大量的篇幅講述任務排程的種種細節,構造了相對完善的Executor(系列)。
這一章我們將結合前面的工作,以AsyncTask為藍本,實現一個更強大的非同步任務框架。
通過繼承AsyncTask無法做到我們預想的效果,所以沒辦法,只能重新寫一個了,反正程式碼也不多。
雖然是重新寫,但原來的絕大部分實現和API都會得到保留。
當然名字也要另起一個,不然就和真正的AsyncTask衝突了;
且命名為 UITask ,因為和純粹的執行緒不同,這個非同步框架還要和UI執行緒互動。
3.1 替換Executor
前面實現的PipeExecutor和LaneExecutor,可以用到UITask中。
實現如下:
abstract class UITask<Params, Progress, Result>{ private val mFullName: String = this.javaClass.name private var mPriority = Priority.NORMAL private val mTag: String by lazy { generateTag() } protected open fun generateTag(): String { return mFullName } protected open val executor: TaskExecutor get() = TaskCenter.laneIO fun execute(vararg params: Params) { if (executor is LaneExecutor) { (executor as LaneExecutor).execute(mTag, mFuture, mPriority) } else { (executor as PipeExecutor).execute(mFuture, mPriority) } } fun priority(priority: Int): UITask<Params, Progress, Result> { var p = priority if (priority != Priority.IMMEDIATE) { if (priority > Priority.HIGH) { p = Priority.HIGH } else if (priority < Priority.LOW) { p = Priority.LOW } } mPriority = p return this } }
抽去了其他程式碼,只保留和Executor相關的。
以上程式碼中有關鍵的幾點:
- executor為TaskExecutor,也就是PipeExecutor和LaneExecutor其中一種;
預設為TaskCenter.laneIO,因為日常使用中用於資料載入的比較多;
聲明瞭"open", 也是就可以通過override來設定需要的TaskExecutor。 - 預設以全限定名為tag, 當然也可以通過重寫generateTag()來自定義tag。
- 優先順序若非 Priority.IMMEDIATE,會被控制在Priority.LOW和Priority.HIGH之間。
基本要素湊齊,PipeExecutor的併發控制和排程優先順序,LaneExecutor的任務去重也都整合進來了,
剩下的就看如何靈活運用了。
3.2 生命週期
上一篇文章我們提到AsyncTask的問題,其中一個就是在Activity銷燬時不會自動取消,當然也可以做到,只是寫起來麻煩。
那麼我們就在UITask封裝一些程式碼,使其可以觀察Activity/Fragment的生命週期。
說到觀察,很自然地就想到了“觀察者模式”來實現。
關係圖如下:

UITask為觀察者,Activity/Fragment為被觀察者。
因為是多對多的關係,所以需要兩個資料結構:一個SparseArray(Map也可以)一個列表。
SparseArray的key為被觀察者的identityHashCode, value為觀察者(UITask)列表。
當被觀察者需要通知事件的時候,再次獲取被觀察者的identityHashCode,索引到對應觀察者列表,遍歷之。
具體實現如下:
object LifeEvent { const val DESTROY = 0 const val SHOW = 1 const val HIDE = 2 } interface LifeListener { fun onEvent(event: Int) }
object LifecycleManager { private val holders = SparseArray<Holder>() fun register(hostHash: Int, listener: LifeListener?) { var holder: Holder? = holders.get(hostHash) if (holder == null) { holder = Holder() holders.put(hostHash, holder) } holder.add(listener) } fun unregister(hostHash: Int, listener: LifeListener?) { holders.get(hostHash)?.remove(listener) } fun notify(host: Any?, event: Int) { val hostHash = System.identityHashCode(host) val index = holders.indexOfKey(hostHash) if (index >= 0) { val holder = holders.valueAt(index).apply {} if (event == LifeEvent.DESTROY) { holders.removeAt(index) } holder.notify(event) } } }
需要注意的是,LifecycleManager不是直接持有被觀察者的引用,而是持有其ID(identityHashCode)。
identityHashCode不是hashCode, 它能夠標識一個物件。
如果沒有identityHashCode,我們也可以給被觀察者分配ID序列,但是那樣侵入性會更大一些。
這樣的實現,只需在Activity/Fragment的對應回撥中呼叫一下LifecycleManager.notify(this, event)即可。
另外我們還注意到,LifeEvent除了DESTROY事件外,還有SHOW和HIDE。
觀察Activity/Fragment的可見和隱藏是何意圖?接下來馬上揭曉。
abstract class UITask<Params, Progress, Result> : LifeListener { private var mHostHash = 0 fun host(host: Any): UITask<Params, Progress, Result> { mHostHash = System.identityHashCode(host) LifecycleManager.register(mHostHash, this) return this } private fun detachHost() { LifecycleManager.unregister(mHostHash, this) } override fun onEvent(event: Int) { if (event == LifeEvent.DESTROY) { if (!isCancelled && status != Status.FINISHED) { // no need to call detachHost for host destroy mHostHash = 0 cancel(true) } } else if (event == LifeEvent.SHOW) { changePriority(+1) } else if (event == LifeEvent.HIDE) { changePriority(-1) } } private fun changePriority(increment: Int) { if (mPriority != Priority.IMMEDIATE) { mPriority = executor.changePriority(mFuture, mPriority, increment) } } }
- host(Any)方法用於註冊觀察者,也就是構建host和Task的關係。
為什麼命名為host呢?因為Task通常是在Activity/Fragment中建立(不然我們也不用大費周章折騰生命週期了),
這時候我們稱Activity/Fragment為“宿主(host)”。 - detachHost() 為私有方法,因為這個方法是在UITask執行完成的時候被呼叫的(內部呼叫)。
- onEvent(Int)函式關注三個事件,前面也提到,除了DESTROY之外,還關注SHOW和HIDE,
主要是在Activity/Fragment的可見狀態改變時調整排程優先順序。
調整優先順序有什麼用呢? 下面先看兩張圖感受一下。
為了凸顯效果,我們把載入任務的併發量控制為1(序列)。
第一張是不會自動調整優先順序的,完全的先進先出:

不改變優先順序
可以看到,切換到第二個頁面,由於上一頁的任務還沒執行完,
所以要一直等到上一頁的任務都完成了才輪到第二個頁面載入。
很顯然這樣體驗不太好。
接下來我們看下動態調整優先順序是什麼效果:

動態調整優先順序
切換到第二個頁面之後,第一個頁面的任務的“排程優先順序”被降低了,所以會優先載入第二個頁面的圖片;
再次切換回第一個頁面,第二個頁面的優先順序被降低,第一個頁面的優先順序恢復,所以優先載入第一個頁面的圖片。
那可否進入第二個頁面的時暫停第一個頁面的任務?
比方說使用者在第二個頁面停留很久,第二個頁面的任務都完成了,然後切換回第一個頁面,發現只有部分圖片(其他被暫停了)。
而如果只是調整優先順序,則第二個頁面的任務都執行完之後,會接著執行第一個頁面的任務,返回第一個頁面時就能夠看到所有圖片了。
這就好比趕車,讓其他人給插個隊,OK,但是不能不給別人排隊了吧~
四、用法
4.1 常規用法
override fun onCreate(savedInstanceState: Bundle?) { // ... TestTask() .priority(Priority.IMMEDIATE) .host(this) .execute("hello") } private inner class TestTask: UITask<String, Integer, String>(){ override fun generateTag(): String { // 一般情況下不需要重寫這個函式,這裡只是為了演示 return "custom tag" } override fun onPreExecute() { result_tv.text = "running" } override fun doInBackground(vararg params: String): String? { for (i in 0..100 step 2) { Thread.sleep(10) publishProgress(Integer(i)) } return "result is:" + params[0].toUpperCase() } override fun onProgressUpdate(vararg values: Integer) { val progress = values[0] progress_bar.progress = progress.toInt() progress_tv.text = "$progress%" } override fun onPostExecute(result: String?) { result_tv.text = result } override fun onCancelled() { showTips("Task cancel ") } }
UITask和AsyncTask用法是類似的, 只是多了一些API:
- 因為生命需要觀察Activity的生命週期,所以需要呼叫host(),傳入當前Activity
- 可以設定任務優先順序
- 有必要時可以重寫generateTag來自定義任務的tag
UITask相比AsyncTask,雖然外表看起來區別不大,但核心卻有質的改變:
1、更靈活的併發控制
2、支援排程優先順序
3、支援任務去重
4、支援生命週期(onDestroy時取消任務,自動調整優先順序)
其中第1點和第4點解決了AsyncTask所存在的問題:
靈活的併發控制,增強了通用性;
支援生命週期,使得任務可以隨Activity銷燬而銷燬,從而也解決了因為生命週期比Activity長導致的記憶體洩漏問題。
至於自動調整優先順序,任務去重等,算是提供了新特性吧。
這些特性,在筆者的另一個開源專案中都用上了。
文章連結: 如何實現一個圖片載入框架
4.2 Executor
當然,專案中不僅僅是UITask,TaskCenter,以及各種Executor, 都是可以單獨使用的。
比方說只是想簡單地執行任務,不需要和UI互動,也可以直接使用Executor:
TaskCenter.io.execute{ // do something } TaskCenter.laneIO.execute("laneIO", { // do something }) val serialExecutor = PipeExecutor(1) serialExecutor.execute{ // do something } TaskCenter.serial.execute ("your tag", { // do something })
4.3 For RxJava
有的文章拿AsyncTask和RxJava做比較,一個300行程式碼的框架和一個2M的框架,其實也沒有太多可比性。
如果說AsyncTask是自行車,“加強版”是摩托車,則RxJava就是汽車(RxJava除了非同步還有更多的內涵)。
各有各的靈魂,不好說誰好誰不好,需看具體場景。
當然,這個“加強版”的初衷是打造更好的非同步任務框架,不是替代RxJava。
而且,雖然摩托車和汽車有較大差異,但取摩托車的汽油來跑汽車也是可以的:
object TaskSchedulers { val io: Scheduler by lazy { Schedulers.from(TaskCenter.io) } val computation: Scheduler by lazy { Schedulers.from(TaskCenter.computation) } val single by lazy { Schedulers.from(PipeExecutor(1)) } }
Observable.range(1, 8) .subscribeOn(TaskSchedulers.computation) .subscribe { Log.d(tag, "number:$it") }
很多開源專案都設計了API來使用外部的Executor,這樣有一個好處:
各種任務都在一個執行緒池上執行任務,可複用彼此建立的執行緒。
4.4 彩蛋
喜歡冰糖葫蘆一樣的鏈式呼叫?
override fun onCreate(savedInstanceState: Bundle?) { // ... val task = ChainTask<Double, Int, String>() task.tag("ChainTest") .preExecute { result_tv.text = "running" } .background { params -> for (i in 0..100 step 2) { Thread.sleep(10) task.publishProgress(i) } "result is:" + (params[0] * 100) } .progressUpdate { values -> val progress = values[0] progress_bar.progress = progress progress_tv.text = "$progress%" } .postExecute { result_tv.text = it } .cancel { showTips("ChainTask cancel ") } .priority(Priority.IMMEDIATE) .host(this) .execute(3.14) }
至於ChainTask是怎麼實現的,本文就不多做介紹了,且留給讀者自行思考;
或者下載專案, 裡面有具體的實現。
五、下載
implementation 'com.horizon.task:task:1.0.4'
相關程式碼已上傳GitHub,