探究高階的Kotlin Coroutines知識
要說程式如何從簡單走向複雜, 執行緒的引入必然功不可沒, 當我們期望利用執行緒來提升程式效能的過程中, 處理執行緒的方式也發生了從原始時代向科技時代發生了一步一步的進化, 正如我們的Elisha大神所著文章The Evolution of Android Network Access 中所講到的, Future可能會是Kotlin Coroutines的時代.
什麼是Coroutines
Coroutines是Kotlin 1.1推出的實驗性的一個擴充套件, 它被定義為一個輕量級的高效的執行緒框架, 並且在1.3版本正式釋出, 去掉Experiment標籤.
如何啟動一個Coroutines
最基礎的建立一個Coroutines的方法就是使用launch
或者async
, 二者的區別是前者返回的是一個Job
, 不帶結果 而後者可以將結果以Deferred<T>
格式返回.
如:
val job = launch { delay(100) }
而通常在Coroutines內執行的函式都會有一個suspend
宣告, 而有suspend
宣告的函式也只能在Coroutines Scope中呼叫.
suspend
的意思是這個函式可以被suspend(掛起), 讓Coroutines來排程它, 這也是為何Kotlin的delay
函式可以不阻塞的進行延遲, 因為它就是一個suspend函式.
Coroutines與執行緒的關係
Coroutines可以簡單理解為一個有佇列的任務鏈, 每一個Coroutines都有自己的Context, 而Context又可以決定其執行的執行緒.
所以可以看到, 並不是起一個Coroutines就是起了一個執行緒, 而只是啟動了一個在某個Scope下執行的協程(Coroutines)罷了. 這裡的Scope (CoroutineScope) 內部包含了一個 Context (CoroutineContext).
public interface CoroutineScope { public val coroutineContext: CoroutineContext }
如果只是通過launch
來啟動一個協程, 那它將會執行在Parent Scope所定義的執行緒中, 但是如果使用GlobalScope.launch
來啟動一個協程, 它將會使用執行緒池中的執行緒來建立一個協程, 執行緒池的大小跟CPU的核數相關.
當然launch
也支援自己傳入一個CoroutinesContext來控制它執行的執行緒, 它叫做CoroutineDispatcher
, 是Context的子類.
上面講了預設的launch
會啟在父Scope(Context)的執行緒中, 而launch(Dispatchers.Default)
則等於GlobalScope.launch
, 還可以通過launch(newSingleThreadContext("MyOwnThread"))
來啟動自己的執行緒, 另外有一個不推薦在general code中出現的launch(Dispatchers.Unconfined)
, 它將會執行在第一個進入Suspend狀態的執行緒中.
可以舉一個簡單的例子:
val job = launch { log("hehe") delay(1000) log("haha") }
這個協程是可以完全在main函式裡執行完的, 即輸出結果為:
hehe haha
因為launch會跑在main的scope中. 如果替換成:
val job = GlobalScope.launch { log("hehe") delay(1000) log("haha") }
則只會輸出hehe
, 因為主執行緒已經結束.
這裡我們可以通過job.join()
來等待子協程執行結束, 這一點跟大家熟知的執行緒的join是一樣.
如何切換Context
如果把Context對應到我們平時認為的執行緒, 那麼這個問題可以類比成如何切換執行緒
.
答案是使用withContext
, 舉一個簡單的栗子.
launch(UI) { updateUI() val result = withContext(IO) { } setView(result) }
它類似於async(IO){ }.await()
.
如何共享資源
執行緒與執行緒之間會涉及到同步與資源競爭的關係, 協程亦是如此.
通常情況下線上程中我們解決問題的方式是加鎖
, 而不正確的使用可能會導致效能下降甚至死鎖(dead lock. 或者在高階語言中使用已經實現執行緒安全的資料型別, 來進行誇執行緒操作。
而我們的Coroutines自然也考慮到了這一點, 它認為我們不應該以共享資源來進行通訊, 而是以通訊來進行資源共享
.
Do not communicate by sharing memory; instead, share memory by communicating.
所以它提出了一個叫做Channel
的東西來在不同的Coroutines之間進行通訊.
譬如我們期望將一堆資料交給兩個並行的協程進行處理, 那麼我們可以把資料放進Channel, 其他的協程從這個Channel進行資料讀取.
launch { for (o in data) { channel.send(o) } channel.close() } launch(One) { for (o in channel) { xxx } } launch(Two) { for (o in channel) { xxx } }
一定要記得關閉channel, 否則從channel讀取資料的協程都將會無限掛起等待資料傳過來.
由於Channel本身實現了iterator
, 所以直接通過in
就可以挨個取出內部的資料.
ReceiveChannel與SendChannel
上一個環節提到的協程之間是通過Channel來進行通訊, 而Channel本身卻是實現了接收管道與傳送管道兩個介面.
我們可以通過producer
函式來進行生成資料, 提供給別的協程, 因為它的返回值是一個ReceiveChannel.
val channel = produce<XXX>() { for (o in data) send(o) }
而且produce自己會做channel close的處理, 省去我們傳送完畢還要掉close的煩惱.
如果我們多個協程需要傳送請求並集中處理, 或者可以叫資料整合, 那麼我們可能需要用到actor
這個函式, 它的返回值是一個SendChannel.
val channel = actor<XXX>() { consumeEach { xxx } } launch(One) { channel.send(xxx) } launch(Two) { channel.send(xxx) }
由於actor
返回的SendChannel有點像是一個郵箱, 它會不斷的接收資料, 所以必須手動關閉才會停止.
多個Channel之間資料如何進行選擇
Coroutines推出一個仍在Experiment階段的關鍵字select
來在多個suspend function中進行選擇第一個到達available的, 其實有點像RxJava的concat+first.
比如我有兩個接收Channel, 但是每一個Channel接收到資料的頻率不得而知, 我想分別從中得到資料, 這裡就需要使用select.
select<Unit> { channel1.onReceive {} channel2.onReceive {} }
如果在配合外圍的迴圈, 就可以做到不斷的去接收兩個Channel的資料.
再比如有兩個傳送Channel都可以處理我的需求, 我也不知道這個時候誰是空閒的, 那也可以通過select來解決.
select<Unit> { channel1.onSend(xxx) {} channel2.onSend(xxx) {} }
有時候兩個Channel是巢狀使用的.
比如一個咖啡店, 他們會不斷的收到Oder, 只有兩個打咖啡的服務員, 咖啡機也只有兩個口,如果我們對這個咖啡店進行抽象. 將Oder存在於一個Channel裡, 服務員接收Order並不斷的把咖啡遞出來, 這也是一個Channel, 咖啡機會不斷接收到服務員需要打咖啡的操作, 也這是一個Channel.
而在這個過程中, 兩個服務員會有一個選擇, 咖啡機的兩個出口也會有一個選擇的過程.
如果抽象成我們的Coroutines程式碼, 或許會是這個樣子:
val orderChannel = producer { for (o in orders) send(o) } val waiter1 = producer { for (o in orderChannel) { pullCoffee(o) } } // waiter2 is the same as 1 val coffeePort1 = actor { consumeEach { //pass coffee through channel inside order it.channel.send(Coffee) it.channel.close() } } // coffeePort2 is the same as 2 pullCoffee { select<Coffee> { coffeePort1.onSend(Request(channel)) { //get coffee from coffeePort channel.recevie() } coffeePort2.onSend .... } } while(someCondition) { select<Coffee> { waiter1.onReceiveOrNull { //上菜了 } waiter2.onReceiveOrNull { //上菜了 } } }
補充說明
協程作為未來non blocking程式設計的方向, 需要大家花時間去理解, 花時間去嘗試, 在此特別推薦這個咖啡小程式幫助大家學習.
https://medium.com/@jagsaund/kotlin-coroutines-channels-csp-android-db441400965f
以及官方的Overview
https://kotlinlang.org/docs/reference/coroutines-overview.html
還有個CheatSheet可以參考
https://blog.kotlin-academy.com/kotlin-coroutines-cheat-sheet-8cf1e284dc35