Kotlin協程原始碼分析(一)
參考資料
搞個例外,先推薦幾篇很好的資料,不然理解會比較困難,不從基礎講起了。
Kotlin Coroutines(協程) 完全解析(二),深入理解協程的掛起、恢復與排程
CoroutineContext
協程的上下文。可以通過 context[CoroutineContext.Element]
拿到當前 context
關聯的任意 element
,後面要看到的攔截器也是靠 interface ContinuationInterceptor : CoroutineContext.Element
與上下文關聯起來的
plus
對於 context
,我們會經常看到:
Dispatchers.IO + job
@NotNull public final CoroutineContext invoke(@NotNull CoroutineContext paramCoroutineContext, @NotNull CoroutineContext.Element paramElement) { Intrinsics.checkParameterIsNotNull(paramCoroutineContext, "acc"); Intrinsics.checkParameterIsNotNull(paramElement, "element"); CoroutineContext localCoroutineContext1 = paramCoroutineContext.minusKey(paramElement.getKey()); if (localCoroutineContext1 == EmptyCoroutineContext.INSTANCE) { return (CoroutineContext)paramElement; } ContinuationInterceptor localContinuationInterceptor = (ContinuationInterceptor)localCoroutineContext1.get((CoroutineContext.Key)ContinuationInterceptor.Key); CombinedContext localCombinedContext; if (localContinuationInterceptor == null) { localCombinedContext = new CombinedContext(localCoroutineContext1, paramElement); } else { CoroutineContext localCoroutineContext2 = localCoroutineContext1.minusKey((CoroutineContext.Key)ContinuationInterceptor.Key); if (localCoroutineContext2 == EmptyCoroutineContext.INSTANCE) { localCombinedContext = new CombinedContext((CoroutineContext)paramElement, (CoroutineContext.Element)localContinuationInterceptor); } else { localCombinedContext = new CombinedContext((CoroutineContext)new CombinedContext(localCoroutineContext2, paramElement), (CoroutineContext.Element)localContinuationInterceptor); } } return (CoroutineContext)localCombinedContext; }
Continuation
BaseContinuationImpl
傳入一個 completion
作為協程結束後的操作行為。

BaseContinuationImpl.png
resumeWith 精華部分
首先自己 invokeSuspend(paramObject)
如果返回 IntrinsicsKt.getCOROUTINE_SUSPENDED()
說明需要暫停
public final void resumeWith(@NotNull Object paramObject) { DebugProbesKt.probeCoroutineResumed((Continuation)this); BaseContinuationImpl localBaseContinuationImpl1 = (BaseContinuationImpl)this; Continuation localContinuation; Object localObject2; for (Object localObject1 = paramObject;; localObject1 = localObject2) { BaseContinuationImpl localBaseContinuationImpl2 = localBaseContinuationImpl1; localContinuation = localBaseContinuationImpl2.completion; if (localContinuation == null) { Intrinsics.throwNpe(); } try { Object localObject3 = localBaseContinuationImpl2.invokeSuspend(localObject1); if (localObject3 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) { return; } localObject2 = Result.constructor-impl(localObject3); } catch (Throwable localThrowable) { localObject2 = Result.constructor-impl(ResultKt.createFailure(localThrowable)); } localBaseContinuationImpl2.releaseIntercepted(); if (!(localContinuation instanceof BaseContinuationImpl)) { break; } localBaseContinuationImpl1 = (BaseContinuationImpl)localContinuation; } localContinuation.resumeWith(localObject2); }
ContinuationImpl
在 base
的基礎上加上 intercepted()
SafeContinuation
suspend
時會在外面套一層 safeContinuation
,判斷當前是繼續執行還是 resume
public SafeContinuation(@NotNull Continuation<? super T> paramContinuation) { this(paramContinuation, CoroutineSingletons.UNDECIDED); }
resumeWith
public void resumeWith(@NotNull Object paramObject) { for (;;) { Object localObject = this.result; if (localObject == CoroutineSingletons.UNDECIDED) { if (!RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, paramObject)) {} } else { if (localObject != IntrinsicsKt.getCOROUTINE_SUSPENDED()) { break; } if (RESULT.compareAndSet(this, IntrinsicsKt.getCOROUTINE_SUSPENDED(), CoroutineSingletons.RESUMED)) { this.delegate.resumeWith(paramObject); return; } } } throw ((Throwable)new IllegalStateException("Already resumed")); }
如果當前 continuation
要繼續時,狀態可以從:
- CoroutineSingletons.UNDECIDED -> paramObject
如果之前協程處於 suspend
狀態,那就轉成 CoroutineSingletons.RESUMED
狀態,同時 this.delegate.resumeWith(paramObject)
getOrThow
@Nullable public final Object getOrThrow() { Object localObject = this.result; if (localObject == CoroutineSingletons.UNDECIDED) { if (RESULT.compareAndSet(this, CoroutineSingletons.UNDECIDED, IntrinsicsKt.getCOROUTINE_SUSPENDED())) { return IntrinsicsKt.getCOROUTINE_SUSPENDED(); } localObject = this.result; } if (localObject == CoroutineSingletons.RESUMED) { return IntrinsicsKt.getCOROUTINE_SUSPENDED(); } if ((localObject instanceof Result.Failure)) { throw ((Result.Failure)localObject).exception; } return localObject; }
如果想要拿到結果時,當前result仍為 CoroutineSingletons.UNDECIDED
,那就說明當前協程需要暫停,狀態轉為 IntrinsicsKt.getCOROUTINE_SUSPENDED()
CombinedContext
經常會看到:
context get() = job() + IO....
這裡 +
號就過載成了 CombinedContext
IntrinsicsKt__IntrinsicsJvmKt
createCoroutineUnintercepted
直接看簡單點的吧:
if ((paramFunction2 instanceof BaseContinuationImpl)) { return ((BaseContinuationImpl)paramFunction2).create(paramR, localContinuation); }
private static final <T> Object startCoroutineUninterceptedOrReturn(@NotNull Function1<? super Continuation<? super T>, ? extends Object> paramFunction1, Continuation<? super T> paramContinuation) { if (paramFunction1 == null) { throw new TypeCastException("null cannot be cast to non-null type (kotlin.coroutines.Continuation<T>) -> kotlin.Any?"); } return ((Function1)TypeIntrinsics.beforeCheckcastToFunctionOfArity(paramFunction1, 1)).invoke(paramContinuation); }
其實就是 function.invoke()..
具體例子
fun main(args: Array<String>) { log("before coroutine") //啟動我們的協程 asyncCalcMd5("test.zip") { log("in coroutine. Before suspend.") //暫停我們的執行緒,並開始執行一段耗時操作 val result: String = suspendCoroutine { continuation -> log("in suspend block.") continuation.resume(calcMd5(continuation.context[FilePath]!!.path)) log("after resume.") } log("in coroutine. After suspend. result = $result") } log("after coroutine") } /** * 上下文,用來存放我們需要的資訊,可以靈活的自定義 */ class FilePath(val path: String): AbstractCoroutineContextElement(FilePath){ companion object Key : CoroutineContext.Key<FilePath> } fun asyncCalcMd5(path: String, block: suspend () -> Unit) { val continuation = object : Continuation<Unit> { override fun resumeWith(result: Result<Unit>) { log("resume: $result") } override val context: CoroutineContext get() = FilePath(path) } block.startCoroutine(continuation) } fun calcMd5(path: String): String{ log("calc md5 for $path.") //暫時用這個模擬耗時 Thread.sleep(1000) //假設這就是我們計算得到的 MD5 值 return System.currentTimeMillis().toString() }
這裡 block
被轉成:
傳入的 continuation
即為 block.startCoroutine(continuation)
, startCoroutine
會被編譯器轉化為 startCoroutine(Function, Continutation)
,這裡
Function: block Continuation: val continutaion
startCoroutine
@SinceKotlin(version="1.3") public static final <R, T> void startCoroutine(@NotNull Function2<? super R, ? super Continuation<? super T>, ? extends Object> paramFunction2, R paramR, @NotNull Continuation<? super T> paramContinuation) { Intrinsics.checkParameterIsNotNull(paramFunction2, "receiver$0"); Intrinsics.checkParameterIsNotNull(paramContinuation, "completion"); Continuation localContinuation = IntrinsicsKt.intercepted(IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation)); Unit localUnit = Unit.INSTANCE; localContinuation.resumeWith(Result.constructor-impl(localUnit)); }
先建立,再 resume
建立分兩步:
IntrinsicsKt.createCoroutineUnintercepted(paramFunction2, paramR, paramContinuation)
if ((paramFunction1 instanceof BaseContinuationImpl)) { return ((BaseContinuationImpl)paramFunction1).create(localContinuation); }
IntrinsicsKt.intercepted
那我們倆看 block
轉換成了什麼樣子的 paramFunction
:
final class block extends SuspendLambda implements Function1<Continuation<? super Unit>, Object>{ @NotNull public final Continuation<Unit> create(@NotNull Continuation<?> paramContinuation) { //create出baseCoroutineImpl Intrinsics.checkParameterIsNotNull(paramContinuation, "completion"); return new 1(this.this$0, paramContinuation); } public final Object invoke(Object paramObject) { return ((1)create((Continuation)paramObject)).invokeSuspend(Unit.INSTANCE); } //resumeWith以後,先走invokeSuspend @Nullable public final Object invokeSuspend(@NotNull Object paramObject) { Object localObject1 = IntrinsicsKt.getCOROUTINE_SUSPENDED(); Object localObject2; switch (this.label) { default: throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine"); case 1: ((1)this.L$0); if (!(paramObject instanceof Result.Failure)) { localObject2 = paramObject; } else { throw ((Result.Failure)paramObject).exception; } break; case 0: if ((paramObject instanceof Result.Failure)) { break label276; } this.this$0.log("in coroutine. Before suspend."); this.L$0 = this; this.label = 1; SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this)); Continuation localContinuation = (Continuation)localSafeContinuation; this.this$0.log("in suspend block."); HongMoActivity localHongMoActivity1 = this.this$0; CoroutineContext.Element localElement = localContinuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key); if (localElement == null) { Intrinsics.throwNpe(); } String str1 = localHongMoActivity1.calcMd5(((HongMoActivity.FilePath)localElement).getPath()); localContinuation.resumeWith(Result.constructor-impl(str1)); this.this$0.log("after resume."); localObject2 = localSafeContinuation.getOrThrow(); if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) { DebugProbesKt.probeCoroutineSuspended(this); } if (localObject2 == localObject1) { return localObject1; } break; } String str2 = (String)localObject2; HongMoActivity localHongMoActivity2 = this.this$0; StringBuilder localStringBuilder = new StringBuilder(); localStringBuilder.append("in coroutine. After suspend. result = "); localStringBuilder.append(str2); localHongMoActivity2.log(localStringBuilder.toString()); return Unit.INSTANCE; label276: throw ((Result.Failure)paramObject).exception; } }
轉成 safeContinuation
後,計算得到結果 resumeWith+getOrThrow
然後傳遞給 completion
。
關鍵程式碼在於:
public suspend inline fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T = suspendCoroutineUninterceptedOrReturn { c: Continuation<T> -> val safe = SafeContinuation(c.intercepted()) block(safe) safe.getOrThrow() }
這裡會轉換成 cps
程式碼,
- 使用
suspendCoroutine
時,先轉成SafeContinuation
, -
getOrThrow
如果拿到的是suspend
,直接返回。 - 返回到前面
BaseContinuationImpl
的迴圈後,因為completion
不是BaseContinuationImpl
,跳出,走到competion.resumeWith
結束。
非同步時
case 0: if ((paramObject instanceof Result.Failure)) { break label228; } this.this$0.log("in coroutine. Before suspend."); this.L$0 = this; this.label = 1; SafeContinuation localSafeContinuation = new SafeContinuation(IntrinsicsKt.intercepted(this)); Continuation localContinuation = (Continuation)localSafeContinuation; this.this$0.log("in suspend block."); HongMoActivity.access$getExecutor$p(this.this$0).submit((Runnable)new HongMoActivity.main.1.invokeSuspend..inlined.suspendCoroutine.lambda.1(localContinuation, this)); localObject2 = localSafeContinuation.getOrThrow(); if (localObject2 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) { DebugProbesKt.probeCoroutineSuspended(this); } if (localObject2 == localObject1) { return localObject1; } break; }
可以看到 submit
後,直接 getOrThrow
此時拿到的為 suspend
所以直接返回,
在 runnable
中:
public final void run() { Continuation localContinuation = this.$continuation; HongMoActivity localHongMoActivity = this.this$0.this$0; CoroutineContext.Element localElement = this.$continuation.getContext().get((CoroutineContext.Key)HongMoActivity.FilePath.Key); if (localElement == null) { Intrinsics.throwNpe(); } String str = localHongMoActivity.calcMd5(((HongMoActivity.FilePath)localElement).getPath()); localContinuation.resumeWith(Result.constructor-impl(str)); this.this$0.this$0.log("after resume."); }
safeContinunation
重新 resumeWith
會進入 label = 1
返回值給 completion
總結
suspend
代表掛起,即執行緒執行到這裡時可能會直接 break
返回,同時會增加一個 continuation
代表一個繼續點,這個也好理解,下次被 resume
時就會從這個 continuation
繼續執行。
待續
-
async|launch
等是如何精簡協程操作的 -
intercept
是怎麼操作的 -
combineContext
到底有什麼意義 -
非同步
是如何拿到continuation
且繼續的 -
suspendCoroutine
用什麼進行替代
下期繼續學習上面這些疑惑,btw新年快樂啊,這篇拖了太久了打臉