Kotlin Coroutines(協程) 完全解析(三),封裝非同步回撥、協程間關係及協程的取消
Kotlin Coroutines(協程) 完全解析系列:
ofollow,noindex" target="_blank">Kotlin Coroutines(協程) 完全解析(一),協程簡介
Kotlin Coroutines(協程) 完全解析(二),深入理解協程的掛起、恢復與排程
Kotlin Coroutines(協程) 完全解析(三),封裝非同步回撥、協程間關係及協程的取消
本文基於 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1
前面兩篇文章解析了掛起函式通過狀態機來實現,協程的本質就是有三層包裝的Continuation
,這篇文章進一步解析協程的運用。主要介紹如何將非同步回撥封裝為掛起函式,解析協程之間的關係以及協程的取消。
1. 封裝非同步回撥為掛起函式
在非同步程式設計中,回撥是非常常見的寫法,那麼如何將回調轉換為協程中的掛起函式呢?可以通過兩個掛起函式suspendCoroutine{}
或suspendCancellableCoroutine{}
,下面看如何將 OkHttp 的網路請求轉換為掛起函式。
suspend fun <T>Call<T>.await(): T = suspendCoroutine { cont -> enqueue(object : Callback<T> { override fun onResponse(call:Call<T>, response:Response<T>) { if (response.isSuccessful) { cont.resume(response.body()!!) } else { cont.resumeWithException(ErrorResponse(response)) } } override fun onFailure(call:Call<T>, t:Throwable) { cont.resumeWithException(t) } }) }
上面的await()
的擴充套件函式呼叫時,首先會掛起當前協程,然後執行enqueue
將網路請求放入佇列中,當請求成功時,通過cont.resume(response.body()!!)
來恢復之前的協程。
再來看下suspendCoroutine{}
和suspendCancellableCoroutine{}
的定義:
public suspend inline fun <T>suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T = suspendCoroutineUninterceptedOrReturn { c: Continuation<T> -> val safe = SafeContinuation(c.intercepted()) block(safe) safe.getOrThrow() } public suspend inline fun <T>suspendCancellableCoroutine( crossinline block: (CancellableContinuation<T>) -> Unit ): T = suspendCoroutineUninterceptedOrReturn { uCont -> val cancellable = CancellableContinuationImpl(uCont.intercepted(), resumeMode = MODE_CANCELLABLE) // 和 suspendCoroutine 的區別就在這裡,如果協程已經被取消或者已完成,就會丟擲 CancellationException 異常 cancellable.initCancellability() block(cancellable) cancellable.getResult() }
它們的關鍵實現都是呼叫suspendCoroutineUninterceptedOrReturn()
函式,它的作用是獲取當前協程的例項,並且掛起當前協程或者不掛起直接返回結果。
協程中還有兩個常見的掛起函式使用到了suspendCoroutineUninterceptedOrReturn()
函式,分別是delay()
和yield()
。
1.1 delay 的實現
public suspend fun delay(timeMillis:Long) { if (timeMillis <= 0) return // don't delay return suspendCancellableCoroutine sc@ { cont: CancellableContinuation<Unit> -> cont.context.delay.scheduleResumeAfterDelay(timeMillis, cont) } } /** Returns [Delay] implementation of the given context */ internal val CoroutineContext.delay: Delay get() = get(ContinuationInterceptor) as? Delay ?: DefaultDelay internal actual val DefaultDelay: Delay = DefaultExecutor
delay 使用suspendCancellableCoroutine
掛起協程,而協程恢復的一般情況下是關鍵在DefaultExecutor.scheduleResumeAfterDelay()
,其中實現是schedule(DelayedResumeTask(timeMillis, continuation))
,其中的關鍵邏輯是將 DelayedResumeTask 放到 DefaultExecutor 的佇列最後,在延遲的時間到達就會執行 DelayedResumeTask,那麼該 task 裡面的實現是什麼:
override fun run() { // 直接在呼叫者執行緒恢復協程 with(cont) { resumeUndispatched(Unit) } }
1.2 yield 的實現
yield()
的作用是掛起當前協程,然後將協程分發到 Dispatcher 的佇列,這樣可以讓該協程所線上程或執行緒池可以執行其他協程邏輯,然後在 Dispatcher 空閒的時候繼續執行原來協程。簡單的來說就是讓出自己的執行權,給其他協程使用,當其他協程執行完成或也讓出執行權時,一開始的協程可以恢復繼續執行。
看下面的程式碼示例:
fun main(args:Array<String>) = runBlocking<Unit> { launch { repeat(3) { println("job1 repeat$ittimes") yield() } } launch { repeat(3) { println("job2 repeat$ittimes") yield() } } }
通過yield()
實現 job1 和 job2 兩個協程交替執行,輸出如下:
job1 repeat 0 times job2 repeat 0 times job1 repeat 1 times job2 repeat 1 times job1 repeat 2 times job2 repeat 2 times
現在來看其實現:
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont -> val context = uCont.context // 檢測協程是否已經取消或者完成,如果是的話丟擲 CancellationException context.checkCompletion() // 如果協程沒有執行緒排程器,或者像 Dispatchers.Unconfined 一樣沒有進行排程,則直接返回 val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit if (!cont.dispatcher.isDispatchNeeded(context)) return@sc Unit // dispatchYield(Unit) 最終會呼叫到 dispatcher.dispatch(context, block) 將協程分發到排程器佇列中,這樣執行緒可以執行其他協程 cont.dispatchYield(Unit) COROUTINE_SUSPENDED }
所以注意到,yield()
需要依賴協程的執行緒排程器,而排程器再次執行該協程時,在第二篇中有講過會呼叫resume
來恢復協程執行。
現在來看封裝非同步邏輯為掛起函式的關鍵是用suspendCoroutineUninterceptedOrReturn
函式包裝,然後在非同步邏輯完成時呼叫resume
手動恢復協程。
2. 協程之間的關係
官方文件中有提到協程之間可能存在父子關係,取消父協程時,也會取消所有子協程。在Job
的原始碼中有這樣一段話描述協程間父子關係:
* A parent-child relation has the following effect: * * * Cancellation of parent with [cancel] or its exceptionalcompletion(failure) *immediately cancels all its children. * * Parent cannot complete until all its children are complete. Parent waits for all its children to *complete in _completing_ or _cancelling_ state. * * Uncaught exception in a child, by default, cancels parent. In particular, this applies to *children created with [launch][CoroutineScope.launch] coroutine builder. Note, that *[async][CoroutineScope.async] and other future-like *coroutine builders do not have uncaught exceptions by definition, since all their exceptions are *caught and are encapsulated in their result.
所以協程間父子關係有三種影響:
-
父協程手動呼叫
cancel()
或者異常結束,會立即取消它的所有子協程。 -
父協程必須等待所有子協程完成(處於完成或者取消狀態)才能完成。
-
子協程丟擲未捕獲的異常時,預設情況下會取消其父協程。
下面先來看看協程是如何建立父子關係的,launch
和async
新建協程時,首先都是newCoroutineContext(context)
新建協程的 CoroutineContext 上下文,下面看其具體細節:
public actual fun CoroutineScope.newCoroutineContext(context:CoroutineContext): CoroutineContext { // 新協程繼承了原來 CoroutineScope 的 coroutineContext val combined = coroutineContext + context val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined // 當新協程沒有指定執行緒排程器時,會預設使用 Dispatchers.Default return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) debug + Dispatchers.Default else debug }
所以新的協程的 CoroutineContext 都繼承了原來 CoroutineScope 的 coroutineContext,然後launch
和async
新建協程最後都會呼叫start(start: CoroutineStart, receiver: R, block: suspend R.() -> T)
,裡面第一行是initParentJob()
,通過註釋可以知道就是這個函式建立父子關係的,下面看其實現細節:
// AbstractCoroutine.kt internal fun initParentJob() { initParentJobInternal(parentContext[Job]) } // JobSupport.kt internal fun initParentJobInternal(parent:Job?) { check(parentHandle == null) if (parent == null) { parentHandle = NonDisposableHandle return } parent.start() // make sure the parent is started @Suppress("DEPRECATION") // 關鍵在於 parent.attachChild(this) val handle = parent.attachChild(this) parentHandle = handle // now check our state _after_ registering (see tryFinalizeSimpleState order of actions) if (isCompleted) { handle.dispose() parentHandle = NonDisposableHandle // release it just in case, to aid GC } }
這裡需要注意的是GlobalScope
和普通協程的CoroutineScope
的區別,GlobalScope
的 Job 是為空的,GlobalScope.launch{}
和GlobalScope.async{}
新建的協程是沒有父協程的。
下面繼續看attachChild
的實現:
public final override fun attachChild(child:ChildJob): ChildHandle { return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(this, child).asHandler) as ChildHandle }
invokeOnCompletion()
函式在前一篇解析 Deferred.await() 中有提到,關鍵是將 handler 節點新增到父協程 state.list 的末尾。
2.1 父協程手動呼叫cancel()
或者異常結束,會立即取消它的所有子協程
跟蹤父協程的cancel()
呼叫過程,其中關鍵過程為 cancel() -> cancel(null) -> cancelImpl(null) -> makeCancelling(null) -> tryMakeCancelling(state, causeException) -> notifyCancelling(list, rootCause),下面繼續分析notifyCancelling(list, rootCause)
的實現:
// JobSupport.kt private fun notifyCancelling(list:NodeList, cause:Throwable) { // first cancel our own children onCancellation(cause) // 這裡會呼叫所有子協程繫結的 ChildHandleNode.invoke(cause) -> childJob.parentCancelled(parentJob) 來取消所有子協程 notifyHandlers<JobCancellingNode<*>>(list, cause) // then cancel parent // cancelParent(cause) 不一定會取消父協程,cancel() 時不會取消父協程,因為此時產生 cause 的是 JobCancellationException,屬於 CancellationException cancelParent(cause) // tentative cancellation -- does not matter if there is no parent } public final override fun parentCancelled(parentJob:ParentJob) { // 父協程取消時,子協程會通過 parentCancelled 來取消自己 cancelImpl(parentJob) } private fun cancelParent(cause:Throwable): Boolean { // CancellationException is considered "normal" and parent is not cancelled when child produces it. // This allow parent to cancel its children (normally) without being cancelled itself, unless // child crashes and produce some other exception during its completion. if (cause is CancellationException) return true if (!cancelsParent) return false // 當 cancelsParent 為 true, 且子執行緒丟擲未捕獲的異常時,預設情況下 childCancelled() 會取消其父協程。 return parentHandle?.childCancelled(cause) == true }
2.2 父協程必須等待所有子協程完成(處於完成或者取消狀態)才能完成
前一篇文章有提到協程的完成通過AbstractCoroutine.resumeWith(result)
實現,呼叫過程為 makeCompletingOnce(result.toState(), defaultResumeMode) -> tryMakeCompleting(),其中關鍵原始碼如下:
// JobSupport.kt private fun tryMakeCompleting(state:Any?, proposedUpdate:Any?, mode:Int): Int { ... // now wait for children val child = firstChild(state) // 等待子協程完成 if (child != null && tryWaitForChild(finishing, child, proposedUpdate)) return COMPLETING_WAITING_CHILDREN // otherwise -- we have not children left (all were already cancelled?) if (tryFinalizeFinishingState(finishing, proposedUpdate, mode)) return COMPLETING_COMPLETED // otherwise retry return COMPLETING_RETRY } private tailrec fun tryWaitForChild(state:Finishing, child:ChildHandleNode, proposedUpdate:Any?): Boolean { // 新增 ChildCompletion 節點到子協程的 state.list 末尾,當子協程完成時會呼叫 ChildCompletion.invoke() val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, handler = ChildCompletion(this, state, child, proposedUpdate).asHandler ) if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it // 迴圈設定所有其他子協程 val nextChild = child.nextChild() ?: return false return tryWaitForChild(state, nextChild, proposedUpdate) }
tryWaitForChild()
也是通過invokeOnCompletion()
新增節點到子協程的 state.list 中,當子協程完成時會呼叫 ChildCompletion.invoke():
// ChildCompletion class override fun invoke(cause:Throwable?) { parent.continueCompleting(state, child, proposedUpdate) } private fun continueCompleting(state:Finishing, lastChild:ChildHandleNode, proposedUpdate:Any?) { require(this.state === state) // consistency check -- it cannot change while we are waiting for children // figure out if we need to wait for next child val waitChild = lastChild.nextChild() // try wait for next child if (waitChild != null && tryWaitForChild(state, waitChild, proposedUpdate)) return // waiting for next child // no more children to wait -- try update state // 當所有子協程都完成時,才會 tryFinalizeFinishingState() 完成自己 if (tryFinalizeFinishingState(state, proposedUpdate, MODE_ATOMIC_DEFAULT)) return }
2.3 子協程丟擲未捕獲的異常時,預設情況下會取消其父協程。
子執行緒丟擲未捕獲的異常時,後續的處理會如何呢?在前一篇解析中協程的運算在第二層包裝 BaseContinuationImpl 中,我們再看一次:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result:Result<Any?>) { ... var param = result while (true) { with(current) { val completion = completion!! val outcome: Result<Any?> = try { // 呼叫 invokeSuspend 方法執行,執行協程的真正運算邏輯 val outcome = invokeSuspend(param) if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { // 協程丟擲未捕獲的異常,會在這裡被攔截,然後作為結果完成協程 Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // 協程的狀態修改在 AbstractCoroutine.resumeWith() 中 completion.resumeWith(outcome) return } } } } }
所以協程有未捕獲的異常中,會在第二層包裝中的resumeWith()
捕獲到,然後呼叫第一層包裝 AbstractCoroutine.resumeWith() 來取消當前協程,處理過程為 AbstractCoroutine.resumeWith(Result.failure(exception)) -> JobSupport.makeCompletingOnce(CompletedExceptionally(exception), defaultResumeMode) -> tryMakeCompleting(state, CompletedExceptionally(exception), defaultResumeMode) -> notifyCancelling(list, exception) -> cancelParent(exception),所以出現未捕獲的異常時,和手動呼叫cancel()
一樣會呼叫到 notifyCancelling(list, exception) 來取消當前協程,和手動呼叫cancel()
的區別在於 exception 不是 CancellationException。
private fun cancelParent(cause:Throwable): Boolean { // CancellationException is considered "normal" and parent is not cancelled when child produces it. // This allow parent to cancel its children (normally) without being cancelled itself, unless // child crashes and produce some other exception during its completion. if (cause is CancellationException) return true if (!cancelsParent) return false // launch 和 async 新建的協程的 cancelsParent 都為 true, 所以子執行緒丟擲未捕獲的異常時,預設情況下 childCancelled() 會取消其父協程。 return parentHandle?.childCancelled(cause) == true } // 預設情況下 childCancelled() 會取消取消協程 public open fun childCancelled(cause:Throwable): Boolean = cancelImpl(cause) && handlesException
3. 協程的取消
前面分析協程父子關係中的取消協程時,可以知道協程的取消只是在協程的第一層包裝中 AbstractCoroutine 中修改協程的狀態,並沒有影響到第二層包裝中 BaseContinuationImpl 中協程的實際運算邏輯。所以協程的取消只是狀態的變化,並不會取消協程的實際運算邏輯,看下面的程式碼示例:
fun main(args:Array<String>) = runBlocking { val job1 = launch(Dispatchers.Default) { repeat(5) { println("job1 sleep${it +1}times") delay(500) } } delay(700) job1.cancel() val job2 = launch(Dispatchers.Default) { var nextPrintTime = 0L var i = 1 while (i <= 3) { val currentTime = System.currentTimeMillis() if (currentTime >= nextPrintTime) { println("job2 sleep${i++}...") nextPrintTime = currentTime + 500L } } } delay(700) job2.cancel() }
輸出結果如下:
job1 sleep 1 times job1 sleep 2 times job2 sleep 1 ... job2 sleep 2 ... job2 sleep 3 ...
上面程式碼中 job1 取消後,delay()
會檢測協程是否已取消,所以 job1 之後的運算就結束了;而 job2 取消後,沒有檢測協程狀態的邏輯,都是計算邏輯,所以 job2 的運算邏輯還是會繼續執行。
所以為了可以及時取消協程的運算邏輯,可以檢測協程的狀態,使用isActive
來判斷,上面示例中可以將while(i <= 3)
替換為while(isActive)
。
4. 小結
最後總結下本文的內容,封裝非同步程式碼為掛起函式其實非常簡單,只需要用suspendCoroutine{}
或suspendCancellableCoroutine{}
,還要非同步邏輯完成用resume()
或resumeWithException
來恢復協程。
新建協程時需要協程間關係,GlobalScope.launch{}
和GlobalScope.async{}
新建的協程是沒有父協程的,而在協程中使用launch{}
和aysnc{}
一般都是子協程。對於父子協程需要注意下面三種關係:
-
父協程手動呼叫
cancel()
或者異常結束,會立即取消它的所有子協程。 -
父協程必須等待所有子協程完成(處於完成或者取消狀態)才能完成。
-
子協程丟擲未捕獲的異常時,預設情況下會取消其父協程。
對於協程的取消,cancel()
只是將協程的狀態修改為已取消狀態,並不能取消協程的運算邏輯,協程庫中很多掛起函式都會檢測協程狀態,如果想及時取消協程的運算,最好使用isActive
判斷協程狀態。