1. 程式人生 > >Kotlin Coroutines不復雜, 我來幫你理一理

Kotlin Coroutines不復雜, 我來幫你理一理

Coroutines 協程

最近在總結Kotlin的一些東西, 發現協程這塊確實不容易說清楚. 之前的那篇就寫得不好, 所以決定重寫.
反覆研究了官網文件和各種教程部落格, 本篇內容是最基礎也最主要的內容, 力求小白也能看懂並理解.

Coroutines概念

Coroutines(協程), 計算機程式元件, 通過允許任務掛起和恢復執行, 來支援非搶佔式的多工. (見Wiki).

協程主要是為了非同步, 非阻塞的程式碼. 這個概念並不是Kotlin特有的, Go, Python等多個語言中都有支援.

Kotlin Coroutines

Kotlin中用協程來做非同步和非阻塞任務, 主要優點是程式碼可讀性好, 不用回撥函式. (用協程寫的非同步程式碼乍一看很像同步程式碼.)

Kotlin對協程的支援是在語言級別的, 在標準庫中只提供了最低程度的APIs, 然後把很多功能都代理到庫中.

Kotlin中只加了suspend作為關鍵字.
asyncawait不是Kotlin的關鍵字, 也不是標準庫的一部分.

比起futures和promises, kotlin中suspending function的概念為非同步操作提供了一種更安全和不易出錯的抽象.

kotlinx.coroutines是協程的庫, 為了使用它的核心功能, 專案需要增加kotlinx-coroutines-core的依賴.

Coroutines Basics: 協程到底是什麼?

先上一段官方的demo:

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch


fun main() {
    GlobalScope.launch { // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

這段程式碼的輸出:
先列印Hello, 延遲1s之後, 列印World.

對這段程式碼的解釋:

launch開始了一個計算, 這個計算是可掛起的(suspendable), 它在計算過程中, 釋放了底層的執行緒, 當協程執行完成, 就會恢復(resume).

這種可掛起的計算就叫做一個協程(coroutine). 所以我們可以簡單地說launch開始了一個新的協程.

注意, 主執行緒需要等待協程結束, 如果註釋掉最後一行的Thread.sleep(2000L), 則只打印Hello, 沒有World.

協程和執行緒的關係

coroutine(協程)可以理解為輕量級的執行緒. 多個協程可以並行執行, 互相等待, 互相通訊. 協程和執行緒的最大區別就是協程非常輕量(cheap), 我們可以建立成千上萬個協程而不必考慮效能.

協程是執行線上程上可以被掛起的運算. 可以被掛起, 意味著運算可以被暫停, 從執行緒移除, 儲存在記憶體裡. 此時, 執行緒就可以自由做其他事情. 當計算準備好繼續進行時, 它會返回執行緒(但不一定要是同一個執行緒).

預設情況下, 協程執行在一個共享的執行緒池裡, 執行緒還是存在的, 只是一個執行緒可以執行多個協程, 所以執行緒沒必要太多.

除錯

在上面的程式碼中加上執行緒的名字:

fun main() {
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World! + ${Thread.currentThread().name}") // print after delay
    }
    println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

可以在IDE的Edit Configurations中設定VM options: -Dkotlinx.coroutines.debug, 執行程式, 會在log中打印出程式碼執行的協程資訊:

Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1

suspend function

上面例子中的delay方法是一個suspend function.
delay()Thread.sleep()的區別是: delay()方法可以在不阻塞執行緒的情況下延遲協程. (It doesn't block a thread, but only suspends the coroutine itself). 而Thread.sleep()則阻塞了當前執行緒.

所以, suspend的意思就是協程作用域被掛起了, 但是當前執行緒中協程作用域之外的程式碼不被阻塞.

如果把GlobalScope.launch替換為thread, delay方法下面會出現紅線報錯:

Suspend functions are only allowed to be called from a coroutine or another suspend function

suspend方法只能在協程或者另一個suspend方法中被呼叫.

在協程等待的過程中, 執行緒會返回執行緒池, 當協程等待結束, 協程會線上程池中一個空閒的執行緒上恢復. (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)

啟動協程

啟動一個新的協程, 常用的主要有以下幾種方式:

  • launch
  • async
  • runBlocking

它們被稱為coroutine builders. 不同的庫可以定義其他更多的構建方式.

runBlocking: 連線blocking和non-blocking的世界

runBlocking用來連線阻塞和非阻塞的世界.

runBlocking可以建立一個阻塞當前執行緒的協程. 所以它主要被用來在main函式中或者測試中使用, 作為連線函式.

比如前面的例子可以改寫成:

fun main() = runBlocking<Unit> {
    // start main coroutine
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
    delay(2000L) // delaying for 2 seconds to keep JVM alive
}

最後不再使用Thread.sleep(), 使用delay()就可以了.
程式輸出:

Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2

launch: 返回Job

上面的例子delay了一段時間來等待一個協程結束, 不是一個好的方法.

launch返回Job, 代表一個協程, 我們可以用Jobjoin()方法來顯式地等待這個協程結束:

fun main() = runBlocking {
    val job = GlobalScope.launch {
        // launch a new coroutine and keep a reference to its Job
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}")
    job.join() // wait until child coroutine completes
}

輸出結果和上面是一樣的.

Job還有一個重要的用途是cancel(), 用於取消不再需要的協程任務.

async: 從協程返回值

async開啟執行緒, 返回Deferred<T>, Deferred<T>Job的子類, 有一個await()函式, 可以返回協程的結果.

await()也是suspend函式, 只能在協程之內呼叫.

fun main() = runBlocking {
    // @coroutine#1
    println(Thread.currentThread().name)
    val deferred: Deferred<Int> = async {
        // @coroutine#2
        loadData()
    }
    println("waiting..." + Thread.currentThread().name)
    println(deferred.await()) // suspend @coroutine#1
}

suspend fun loadData(): Int {
    println("loading..." + Thread.currentThread().name)
    delay(1000L) // suspend @coroutine#2
    println("loaded!" + Thread.currentThread().name)
    return 42
}

執行結果:

main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42

Context, Dispatcher和Scope

看一下launch方法的宣告:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
...
}

其中有幾個相關概念我們要了解一下.

協程總是在一個context下執行, 型別是介面CoroutineContext. 協程的context是一個索引集合, 其中包含各種元素, 重要元素就有Job和dispatcher. Job代表了這個協程, 那麼dispatcher是做什麼的呢?

構建協程的coroutine builder: launch, async, 都是CoroutineScope型別的擴充套件方法. 檢視CoroutineScope介面, 其中含有CoroutineContext的引用. scope是什麼? 有什麼作用呢?

下面我們就來回答這些問題.

Dispatchers和執行緒

Context中的CoroutineDispatcher可以指定協程執行在什麼執行緒上. 可以是一個指定的執行緒, 執行緒池, 或者不限.

看一個例子:

fun main() = runBlocking<Unit> {
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {
        // will get dispatched to DefaultDispatcher
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) {
        // will get its own new thread
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

執行後打印出:

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

API提供了幾種選項:

  • Dispatchers.Default代表使用JVM上的共享執行緒池, 其大小由CPU核數決定, 不過即便是單核也有兩個執行緒. 通常用來做CPU密集型工作, 比如排序或複雜計算等.
  • Dispatchers.Main指定主執行緒, 用來做UI更新相關的事情. (需要新增依賴, 比如kotlinx-coroutines-android.) 如果我們在主執行緒上啟動一個新的協程時, 主執行緒忙碌, 這個協程也會被掛起, 僅當執行緒有空時會被恢復執行.
  • Dispatchers.IO: 採用on-demand建立的執行緒池, 用於網路或者是讀寫檔案的工作.
  • Dispatchers.Unconfined: 不指定特定執行緒, 這是一個特殊的dispatcher.

如果不明確指定dispatcher, 協程將會繼承它被啟動的那個scope的context(其中包含了dispatcher).

在實踐中, 更推薦使用外部scope的dispatcher, 由呼叫方決定上下文. 這樣也方便測試.

newSingleThreadContext建立了一個執行緒來跑協程, 一個專注的執行緒算是一種昂貴的資源, 在實際的應用中需要被釋放或者儲存複用.

切換執行緒還可以用withContext, 可以在指定的協程context下執行程式碼, 掛起直到它結束, 返回結果.
另一種方式是新啟一個協程, 然後用join明確地掛起等待.

在Android這種UI應用中, 比較常見的做法是, 頂部協程用CoroutineDispatchers.Main, 當需要在別的執行緒上做一些事情的時候, 再明確指定一個不同的dispatcher.

Scope是什麼?

launch, asyncrunBlocking開啟新協程的時候, 它們自動建立相應的scope. 所有的這些方法都有一個帶receiver的lambda引數, 預設的receiver型別是CoroutineScope.

IDE會提示this: CoroutineScope:

launch { /* this: CoroutineScope */
}

當我們在runBlocking, launch, 或async的大括號裡面再建立一個新的協程的時候, 自動就在這個scope裡建立:

fun main() = runBlocking {
    /* this: CoroutineScope */
    launch { /* ... */ }
    // the same as:
    this.launch { /* ... */ }
}

因為launch是一個擴充套件方法, 所以上面例子中預設的receiver是this.
這個例子中launch所啟動的協程被稱作外部協程(runBlocking啟動的協程)的child. 這種"parent-child"的關係通過scope傳遞: child在parent的scope中啟動.

協程的父子關係:

  • 當一個協程在另一個協程的scope中被啟動時, 自動繼承其context, 並且新協程的Job會作為父協程Job的child.

所以, 關於scope目前有兩個關鍵知識點:

  • 我們開啟一個協程的時候, 總是在一個CoroutineScope裡.
  • Scope用來管理不同協程之間的父子關係和結構.

協程的父子關係有以下兩個特性:

  • 父協程被取消時, 所有的子協程都被取消.
  • 父協程永遠會等待所有的子協程結束.

值得注意的是, 也可以不啟動協程就建立一個新的scope. 建立scope可以用工廠方法: MainScope()CoroutineScope().

coroutineScope()方法也可以建立scope. 當我們需要以結構化的方式在suspend函式內部啟動新的協程, 我們建立的新的scope, 自動成為suspend函式被呼叫的外部scope的child.

上面的父子關係, 可以進一步抽象到, 沒有parent協程, 由scope來管理其中所有的子協程.

Scope在實際應用中解決什麼問題呢? 如果我們的應用中, 有一個物件是有自己的生命週期的, 但是這個物件又不是協程, 比如Android應用中的Activity, 其中啟動了一些協程來做非同步操作, 更新資料等, 當Activity被銷燬的時候需要取消所有的協程, 來避免記憶體洩漏. 我們就可以利用CoroutineScope來做這件事: 建立一個CoroutineScope物件和activity的生命週期繫結, 或者讓activity實現CoroutineScope介面.

所以, scope的主要作用就是記錄所有的協程, 並且可以取消它們.

A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.

Structured Concurrency

這種利用scope將協程結構化組織起來的機制, 被稱為"structured concurrency".
好處是:

  • scope自動負責子協程, 子協程的生命和scope繫結.
  • scope可以自動取消所有的子協程.
  • scope自動等待所有的子協程結束. 如果scope和一個parent協程繫結, 父協程會等待這個scope中所有的子協程完成.

通過這種結構化的併發模式: 我們可以在建立top級別的協程時, 指定主要的context一次, 所有巢狀的協程會自動繼承這個context, 只在有需要的時候進行修改即可.

GlobalScope: daemon

GlobalScope啟動的協程都是獨立的, 它們的生命只受到application的限制. 即GlobalScope啟動的協程沒有parent, 和它被啟動時所在的外部的scope沒有關係.

launch(Dispatchers.Default) { ... }GlobalScope.launch { ... }用的dispatcher是一樣的.

GlobalScope啟動的協程並不會保持程序活躍. 它們就像daemon threads(守護執行緒)一樣, 如果JVM發現沒有其他一般的執行緒, 就會關閉.

參考

  • Coroutine Wiki
  • 官方文件 Overview頁
  • 官方文件 Coroutines Guide
  • Asynchronous Programming Techniques
  • Your first coroutine with Kotlin
  • Introduction to Coroutines and Channels
  • Github: Kotlin/kotlinx.coroutines
  • Github: Coroutines Guide
  • Github: KEEP: Kotlin Coroutines

第三方部落格:

  • Coroutines on Android (part I): Getting the background
  • Async Operations with Kotlin Coroutines — Part 1
  • Kotlin Coroutines Tutorial for Android
  • Coroutine Context and Scope