Kotlin Coroutines(協程) 完全解析(二),深入理解協程的掛起、恢復與排程
Kotlin Coroutines(協程) 完全解析系列:
ofollow,noindex" target="_blank">Kotlin Coroutines(協程) 完全解析(一),協程簡介
Kotlin Coroutines(協程) 完全解析(二),深入理解協程的掛起、恢復與排程
本文基於 Kotlin v1.3.0-rc-146,Kotlin-Coroutines v1.0.0-RC1
前面一篇文章協程簡介,簡單介紹了協程的一些基本概念以及其簡化非同步程式設計的優勢,但是協程與執行緒有什麼區別,協程的掛起與恢復是如何實現的,還有協程執行在哪個執行緒上,依然不是很清楚。這篇文章將分析協程的實現原理,一步步揭開協程的面紗。先來看看協程中最關鍵的掛起函式的實現原理:
1. 掛起函式的工作原理
協程的內部實現使用了 Kotlin 編譯器的一些編譯技術,當掛起函式呼叫時,背後大致細節如下:
掛起函式或掛起 lambda 表示式呼叫時,都有一個隱式的引數額外傳入,這個引數是Continuation
型別,封裝了協程恢復後的執行的程式碼邏輯。
用前文中的一個掛起函式為例:
suspend fun requestToken(): Token { ... }
實際上在 JVM 中更像下面這樣:
ObjectrequestToken(Continuation<Token> cont){ ... }
Continuation
的定義如下,類似於一個通用的回撥介面:
/** * Interface representing a continuation after a suspension point that returns value of type `T`. */ public interface Continuation<in T>{ /** * Context of the coroutine that corresponds to this continuation. */ public val context: CoroutineContext /** * Resumes the execution of the corresponding coroutine passing successful or failed [result] as the * return value of the last suspension point. */ public fun resumeWith(result:Result<T>) }
現在再看之前postItem
函式:
suspend fun requestToken(): Token { ... }// 掛起函式 suspend fun createPost(token:Token, item:Item): Post { ... }// 掛起函式 fun processPost(post:Post) { ... } fun postItem(item:Item) { GlobalScope.launch { val token = requestToken() val post = createPost(token, item) processPost(post) } }
然而,協程內部實現不是使用普通回撥的形式,而是使用狀態機來處理不同的掛起點,大致的 CPS(Continuation Passing Style) 程式碼為:
// 編譯後生成的內部類大致如下 final class postItem$1extends SuspendLambda...{ public final Object invokeSuspend(Object result){ ... switch (this.label) { case 0: this.label = 1; token = requestToken(this) break; case 1: this.label = 2; Token token = result; post = createPost(token, this.item, this) break; case 2: Post post = result; processPost(post) break; } } }
上面程式碼中每一個掛起點和初始掛起點對應的 Continuation 都會轉化為一種狀態,協程恢復只是跳轉到下一種狀態中。掛起函式將執行過程分為多個 Continuation 片段,並且利用狀態機的方式保證各個片段是順序執行的。
1.1 掛起函式可能會掛起協程
掛起函式使用 CPS style 的程式碼來掛起協程,保證掛起點後面的程式碼只能在掛起函式執行完後才能執行,所以掛起函式保證了協程內的順序執行順序。
在多個協程的情況下,掛起函式的作用更加明顯:
fun postItem(item:Item) { GlobalScope.launch { // async { requestToken() } 新建一個協程,可能在另一個執行緒執行 // 但是 await() 是掛起函式,當前協程執行邏輯卡在第一個分支,第一種狀態,當 async 的協程執行完後恢復當前協程,才會切換到下一個分支 val token = async { requestToken() }.await() // 在第二個分支狀態中,又新建一個協程,使用 await 掛起函式將之後程式碼作為 Continuation 放倒下一個分支狀態,直到 async 協程執行完 val post = aync { createPost(token, item) }.await() // 最後一個分支狀態,直接在當前協程處理 processPost(post) } }
上面的例子中,await()
掛起函式掛起當前協程,直到非同步協程完成執行,但是這裡並沒有阻塞執行緒
,是使用狀態機的控制邏輯來實現。而且掛起函式可以保證掛起點之後的程式碼一定在掛起點前程式碼執行完成後才會執行,掛起函式保證順序執行,所以非同步邏輯也可以用順序的程式碼順序來編寫。
注意掛起函式不一定會掛起協程,如果相關呼叫的結果已經可用,庫可以決定繼續進行而不掛起,例如async { requestToken() }
的返回值Deferred
的結果已經可用時,await()
掛起函式可以直接返回結果,不用再掛起協程。
1.2 掛起函式不會阻塞執行緒
掛起函式掛起協程,並不會阻塞協程所在的執行緒,例如協程的delay()
掛起函式會暫停協程一定時間,並不會阻塞協程所線上程,但是Thread.sleep()
函式會阻塞執行緒。
看下面一個例子,兩個協程執行在同一執行緒上:
fun main(args:Array<String>) { // 建立一個單執行緒的協程排程器,下面兩個協程都執行在這同一執行緒上 val coroutineDispatcher = newSingleThreadContext("ctx") // 啟動協程 1 GlobalScope.launch(coroutineDispatcher) { println("the first coroutine") delay(200) println("the first coroutine") } // 啟動協程 2 GlobalScope.launch(coroutineDispatcher) { println("the second coroutine") delay(100) println("the second coroutine") } // 保證 main 執行緒存活,確保上面兩個協程執行完成 Thread.sleep(500) }
執行結果為:
the first coroutine the second coroutine the second coroutine the first coroutine
從上面結果可以看出,當協程 1 暫停 200 ms 時,執行緒並沒有阻塞,而是執行協程 2 的程式碼,然後在 200 ms 時間到後,繼續執行協程 1 的邏輯。所以掛起函式並不會阻塞執行緒,這樣可以節省執行緒資源,協程掛起時,執行緒可以繼續執行其他邏輯。
1.3 掛起函式恢復協程後執行在哪個執行緒
協程的所屬的執行緒排程在前一篇文章《協程簡介》中有提到過,主要是由協程的CoroutineDispatcher
控制,CoroutineDispatcher
可以指定協程執行在某一特定執行緒上、運作線上程池中或者不指定所執行的執行緒。所以協程排程器可以分為Confined dispatcher
和Unconfined dispatcher
,Dispatchers.Default
、Dispatchers.IO
和Dispatchers.Main
屬於Confined dispatcher
,都指定了協程所執行的執行緒或執行緒池,掛起函式恢復後協程也是執行在指定的執行緒或執行緒池上的,而Dispatchers.Unconfined
屬於Unconfined dispatcher
,協程啟動並執行在 Caller Thread 上,但是隻是在第一個掛起點之前是這樣的,掛起恢復後執行在哪個執行緒完全由所呼叫的掛起函式決定。
fun main(args:Array<String>) = runBlocking<Unit> { launch { // 預設繼承 parent coroutine 的 CoroutineDispatcher,指定執行在 main 執行緒 println("main runBlocking: I'm working in thread${Thread.currentThread().name}") delay(100) println("main runBlocking: After delay in thread${Thread.currentThread().name}") } launch(Dispatchers.Unconfined) { println("Unconfined : I'm working in thread${Thread.currentThread().name}") delay(100) println("Unconfined : After delay in thread${Thread.currentThread().name}") } }
輸出如下:
Unconfined: I'm working in thread main main runBlocking: I'm working in thread main Unconfined: After delay in thread kotlinx.coroutines.DefaultExecutor main runBlocking: After delay in thread main
上面第三行輸出,經過delay
掛起函式後,使用Dispatchers.Unconfined
的協程掛起恢復後依然在delay
函式使用的DefaultExecutor
上。
2. 協程深入解析
上面更多地是通過 demo 的方式說明掛起函式函式的一些特性,但是協程的建立、啟動、恢復、執行緒排程、協程切換是如何實現的呢,還是不清楚,下面結合原始碼詳細地解析協程。
2.1 協程的建立與啟動
先從新建一個協程開始分析協程的建立,最常見的協程建立方式為CoroutineScope.launch {}
,關鍵原始碼如下:
public fun CoroutineScope.launch( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope.() -> Unit ): Job { ... coroutine.start(start, coroutine, block) return coroutine }
coroutine.start(start, coroutine, block)
預設情況下會走到startCoroutineCancellable
,最終會呼叫到createCoroutineUnintercepted
。
/** * Creates unintercepted coroutine without receiver and with result type [T]. * This function creates a new, fresh instance of suspendable computation every time it is invoked. * * To start executing the created coroutine, invoke `resume(Unit)` on the returned [Continuation] instance. * The [completion] continuation is invoked when coroutine completes with result or exception. ... */ public actual fun <T>(suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit> { ... }
重點注意該方法的註釋,建立一個協程,建立了一個新的可掛起計算,通過呼叫resume(Unit)
啟動該協程。而且返回值為Continuation
,Continuation
提供了resumeWith
恢復協程的介面,用以實現協程恢復,Continuation
封裝了協程的程式碼執行邏輯和恢復介面。
再看之前協程程式碼編譯生成的內部類final class postItem$1 extends SuspendLambda ...
,協程的計算邏輯封裝在invokeSuspend
方法中,而SuspendLambda
的繼承關係為 SuspendLambda -> ContinuationImpl -> BaseContinuationImpl -> Continuation,其中BaseContinuationImpl
部分關鍵原始碼如下:
internal abstract class BaseContinuationImpl(...) { // 實現 Continuation 的 resumeWith,並且是 final 的,不可被重寫 public final override fun resumeWith(result:Result<Any?>) { ... val outcome = invokeSuspend(param) ... } // 由編譯生成的協程相關類來實現,例如 postItem$1 protected abstract fun invokeSuspend(result:Result<Any?>): Any? }
而這部分與之前的分析也是吻合的,啟動協程流程是resume(Unit)
->resumeWith()
->invokeSuspend()
,協程的掛起通過suspend
掛起函式實現,協程的恢復通過Continuation.resumeWith
實現。
2.2 協程的執行緒排程
協程的執行緒排程是通過攔截器實現的,前面提到了協程啟動呼叫到了startCoroutineCancellable
,該方法實現為:
internal fun <T>(suspend () -> T).startCoroutineCancellable(completion: Continuation<T>) = createCoroutineUnintercepted(completion).intercepted().resumeCancellable(Unit) // createCoroutineUnintercepted(completion) 會建立一個新的協程,返回值型別為 Continuation // intercepted() 是給 Continuation 加上 ContinuationInterceptor 攔截器,也是執行緒排程的關鍵 // resumeCancellable(Unit) 最終將呼叫 resume(Unit) 啟動協程
再看intercepted()
的具體實現:
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> = (this as? ContinuationImpl)?.intercepted() ?: this // ContinuationImpl 是 SuspendLambda 的父類 internal abstract class ContinuationImpl(...) : BaseContinuationImpl(completion) { @Transient private var intercepted: Continuation<Any?>? = null public fun intercepted(): Continuation<Any?> = intercepted ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this) .also { intercepted = it } // intercepted() 方法關鍵是 context[ContinuationInterceptor]?.interceptContinuation(this) // context[ContinuationInterceptor] 就是協程的 CoroutineDispatcher } public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { /** * Returns continuation that wraps the original [continuation], thus intercepting all resumptions. */ public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) }
所以intercepted()
最終會使用協程的CoroutineDispatcher
的interceptContinuation
方法包裝原來的 Continuation,攔截所有的協程執行操作。
DispatchedContinuation
攔截了協程的啟動和恢復,分別是resumeCancellable(Unit)
和重寫的resumeWith(Result)
:
internal class DispatchedContinuation<in T>( @JvmField val dispatcher: CoroutineDispatcher, @JvmField val continuation: Continuation<T> ) : Continuation<T> by continuation, DispatchedTask<T> { inline fun resumeCancellable(value:T) { // 判斷是否需要執行緒排程 if (dispatcher.isDispatchNeeded(context)) { ... // 將協程的運算分發到另一個執行緒 dispatcher.dispatch(context, this) } else { ... // 如果不需要排程,直接在當前執行緒執行協程運算 resumeUndispatched(value) } } override fun resumeWith(result:Result<T>) { // 判斷是否需要執行緒排程 if (dispatcher.isDispatchNeeded(context)) { ... // 將協程的運算分發到另一個執行緒 dispatcher.dispatch(context, this) } else { ... // 如果不需要排程,直接在當前執行緒執行協程運算 continuation.resumeWith(result) } } } internal interface DispatchedTask<in T> :Runnable { public override fun run() { ... // 封裝了 continuation.resume 邏輯 } }
繼續跟蹤newSingleThreadContext()
、Dispatchers.IO
等dispatch
方法的實現,發現其實都呼叫了Executor.execute(Runnable)
方法,而Dispatchers.Unconfined
的實現更簡單,關鍵在於isDispatchNeeded()
返回為false
。
2.3 協程的掛起和恢復
Kotlin 編譯器會生成繼承自SuspendLambda
的子類,協程的真正運算邏輯都在invokeSuspend
中。但是協程掛起的具體實現是如何呢?先看下面示例程式碼:
fun main(args:Array<String>) = runBlocking<Unit> { // 新建並啟動 blocking 協程,執行在 main 執行緒上,等待所有子協程執行完成後才會結束 launch(Dispatchers.Unconfined) { // 新建並啟動 launch 協程,沒有指定所執行執行緒,一開始執行在呼叫者所在的 main 執行緒上 println("${Thread.currentThread().name}: launch start") async(Dispatchers.Default) { // 新建並啟動 async 協程,執行在 Dispatchers.Default 的執行緒池中 println("${Thread.currentThread().name}: async start") delay(100)// 掛起 async 協程 100 ms println("${Thread.currentThread().name}: async end") }.await() // 掛起 launch 協程,直到 async 協程結束 println("${Thread.currentThread().name}: launch end") } }
其中 launch 協程編譯生成的 SuspendLambda 子類的invokeSuspend
方法如下:
public final Object invokeSuspend(@NotNull Object result) { Object coroutine_suspended = IntrinsicsKt.getCOROUTINE_SUSPENDED(); switch (this.label) { case 0: ... System.out.println(stringBuilder.append(currentThread.getName()).append(" : launch start").toString()); // 新建並啟動 async 協程 Deferred async$default = BuildersKt.async$default(coroutineScope, (CoroutineContext) Dispatchers.getDefault(), null, (Function2) new 1(null), 2, null); this.label = 1; // 呼叫 await() 掛起函式 if (async$default.await(this) == coroutine_suspended) { return coroutine_suspended; } break; case 1: if (result instanceof Failure) { throw ((Failure) result).exception; } // 恢復協程後再執行一次 resumeWith(),然後無異常的話執行最後的 println() break; default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); } ... System.out.println(stringBuilder2.append(currentThread2.getName()).append(" : launch end").toString()); return Unit.INSTANCE; }
上面程式碼中 launch 協程掛起的關鍵在於async$default.await(this) == coroutine_suspended
,如果此時 async 執行緒未執行完成,await()
返回為IntrinsicsKt.getCOROUTINE_SUSPENDED()
,就會 return,launch 協程的invokeSuspend
方法執行完成,協程所線上程繼續往下執行,此時 launch 執行緒處於掛起狀態。所以協程掛起就是協程掛起點之前邏輯執行完成,協程的運算關鍵方法resumeWith()
執行完成,執行緒繼續執行往下執行其他邏輯。
協程掛起有三點需要注意的:
-
啟動其他協程並不會掛起當前協程,所以
launch
和async
啟動執行緒時,除非新協程執行在當前執行緒,則當前協程只能在新協程執行完成後繼續執行,否則當前協程都會馬上繼續執行。 -
協程掛起並不會阻塞執行緒,因為協程掛起時相當於執行完協程的方法,執行緒繼續執行其他之後的邏輯。
-
掛起函式並一定都會掛起協程,例如
await()
掛起函式如果返回值不等於IntrinsicsKt.getCOROUTINE_SUSPENDED()
,則協程繼續執行掛起點之後邏輯。
下面繼續分析await()
的實現原理,它的實現中關鍵是呼叫了JobSupport.awaitSuspend()
方法:
private suspend fun awaitSuspend(): Any? = suspendCoroutineUninterceptedOrReturn { uCont -> /* * Custom code here, so that parent coroutine that is using await * on its child deferred (async) coroutine would throw the exception that this child had * thrown and not a JobCancellationException. */ val cont = AwaitContinuation(uCont.intercepted(), this) cont.initCancellability() invokeOnCompletion(ResumeAwaitOnCompletion(this, cont).asHandler) cont.getResult() } private class ResumeAwaitOnCompletion<T>( job: JobSupport, private val continuation: AbstractContinuation<T> ) : JobNode<JobSupport>(job) { override fun invoke(cause:Throwable?) { val state = job.state check(state !is Incomplete) if (state is CompletedExceptionally) { // Resume with exception in atomic way to preserve exception continuation.resumeWithExceptionMode(state.cause, MODE_ATOMIC_DEFAULT) } else { // Resuming with value in a cancellable way (AwaitContinuation is configured for this mode). @Suppress("UNCHECKED_CAST") continuation.resume(state as T) } } override fun toString() = "ResumeAwaitOnCompletion[$continuation]" }
上面原始碼中ResumeAwaitOnCompletion
的invoke
方法的邏輯就是呼叫continuation.resume(state as T)
恢復協程。invokeOnCompletion
函式裡面是如何實現 async 協程完成後自動恢復之前協程的呢,原始碼實現有些複雜,因為很多邊界情況處理就不全部展開,其中最關鍵的邏輯如下:
// handler 就是 ResumeAwaitOnCompletion 的例項,將 handler 作為節點 val node = nodeCache ?: makeNode(handler, onCancelling).also { nodeCache = it } // 將 node 節點新增到 state.list 中 if (!addLastAtomic(state, list, node)) return@loopOnState // retry
接下來我斷點除錯 launch 協程恢復的過程,從 async 協程的SuspendLambda
的子類的completion.resumeWith(outcome)
->AbstractCoroutine.resumeWith(result)
..->JobSupport.tryFinalizeSimpleState()
->JobSupport.completeStateFinalization()
->state.list?.notifyCompletion(cause)
->node.invoke
,最後 handler 節點裡面通過呼叫resume(result)
恢復協程。
而這過程中有兩個final
的resumeWith
方法,一個是SuspendLambda
的父類BaseContinuationImpl
的,我們再來詳細分析一篇:
internal abstract class BaseContinuationImpl( public val completion: Continuation<Any?>? ) : Continuation<Any?>, CoroutineStackFrame, Serializable { public final override fun resumeWith(result:Result<Any?>) { ... var param = result while (true) { with(current) { val completion = completion!! val outcome: Result<Any?> = try { // 呼叫 invokeSuspend 方法執行,執行協程的真正運算邏輯 val outcome = invokeSuspend(param) // 協程掛起時 invokeSuspend 才會返回 COROUTINE_SUSPENDED,所以協程掛起時,其實只是協程的 resumeWith 執行邏輯執行完成,再次呼叫 resumeWith 時,協程掛起點之後的邏輯才能繼續執行 if (outcome === COROUTINE_SUSPENDED) return Result.success(outcome) } catch (exception: Throwable) { Result.failure(exception) } releaseIntercepted() // this state machine instance is terminating // 這裡可以看出 Continuation 其實分為兩類,一種是 BaseContinuationImpl,封裝了協程的真正運算邏輯 if (completion is BaseContinuationImpl) { // unrolling recursion via loop current = completion param = outcome } else { // 斷點時發現 completion 是 DeferredCoroutine 例項,這裡實際呼叫的是其父類 AbstractCoroutine 的 resumeWith 方法 completion.resumeWith(outcome) return } } } } }
接下來再來看另外一類 Continuation,AbstractCoroutine 的resumeWith
實現:
public abstract class AbstractCoroutine<in T>( @JvmField protected val parentContext: CoroutineContext, active: Boolean = true ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { /** * Completes execution of this with coroutine with the specified result. */ public final override fun resumeWith(result:Result<T>) { // makeCompletingOnce 大致實現是修改協程狀態,如果需要的話還會將結果返回給呼叫者協程,並恢復呼叫者協程 makeCompletingOnce(result.toState(), defaultResumeMode) } }
所以其中一類 ContinuationBaseContinuationImpl
的resumeWith
封裝了協程的運算邏輯,用以協程的啟動和恢復;而另一類 ContinuationAbstractCoroutine
,主要是負責維護協程的狀態和管理,它的resumeWith
則是完成協程,恢復呼叫者協程。
2.4 協程的三層包裝
常用的launch
和async
返回的Job
、Deferred
,裡面封裝了協程狀態,提供了取消協程介面,而它們的例項都是繼承自AbstractCoroutine
,它是協程的第一層包裝。第二層包裝是編譯器生成的SuspendLambda
的子類,封裝了協程的真正運算邏輯,繼承自BaseContinuationImpl
,其中completion
屬性就是協程的第一層包裝。第三層包裝是前面分析協程的執行緒排程時提到的DispatchedContinuation
,封裝了執行緒排程邏輯,包含了協程的第二層包裝。三層包裝都實現了Continuation
介面,通過代理模式將協程的各層包裝組合在一起,每層負責不同的功能。
下面是協程執行的流程圖:
3. 小結
經過以上解析之後,再來看協程就是一段可以掛起和恢復執行的運算邏輯,而協程的掛起是通過掛起函式實現的,掛起函式用狀態機的方式用掛起點將協程的運算邏輯拆分為不同的片段,每次執行協程執行的不同的邏輯片段。所以協程有兩個很大的好處:一是簡化非同步程式設計,支援非同步返回;而是掛起不阻塞執行緒,提供執行緒利用率。