1. 程式人生 > >kotlin中非同步處理框架-協程(Coroutines)的使用和配置

kotlin中非同步處理框架-協程(Coroutines)的使用和配置

前言:
本人通過對kotlin由淺到深使用了一年半,並且多個專案已基本達到純kotlin開發。越來越覺得kotlin對於開發Android來說,不僅僅是多了一門開發語言,更是提升開發效率和優化程式設計的利器!值得Android開發者放心學習和語言轉換,有Google做靠山,不用擔心kotlin的前景問題。用了kotlin,你會發現其他所有語言在開發Android時,都顯得那麼臃腫和囉嗦。

今天我們主要簡單講一下kotlin的基礎配置和非同步處理框架-協程(coroutines)的配置。之所以將kotlin的基礎配置和協程放在一起,是因為協程的使用只能在kotlin語言環境裡,不能在Java裡使用,況且我們任何一個App的開發,肯定都會涉及到非同步耗時任務的執行至於為什麼要特意講一下協程,你在網上搜一下就知道,這個框架是多麼強大和神奇!

不僅是我們處理非同步任務時的簡單高效的工具,更是結合了原來
Thread+Handler+Runable+RxAndroid這幾大切換器和控制元件的使用場景,是我們處理非同步任務的不二選擇!

下面我們直接通過程式碼看看我們需要乾的幾件事:
1、專案根目錄gradle中:

buildscript {
    ext.kotlin_version = '1.2.71'
        ......
    dependencies {
        .....
        classpath "org.jetbrains.kotlin:kotlin-gradle-plugin:$kotlin_version"
        classpath "org.jetbrains.kotlin:kotlin-android-extensions:$kotlin_version"
        .....
    }
}

2、app/gradle中:

.....
apply plugin: 'kotlin-android'
apply plugin: 'kotlin-android-extensions'
......
kotlin {
    experimental {
        coroutines 'enable'
    }
}
dependencies {
    ....
    implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
    //coroutines的配置重點是這裡
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:0.22.5'
    implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-android:0.22.5'
    .....
}

通過以上兩步,基本完成了對kotlin和coroutines的基礎配置,那麼程式碼裡我們可以直接使用了:

fun getData(){
        //啟動執行緒
        val job = launch {
            //載入資料任務一
            val load = loadData()
            //重新整理介面
            launch(UI) {
                Toast.makeText([email protected], load, Toast.LENGTH_SHORT).show()
                Log.d("launch", load)
            }
              //載入資料任務二
            val select = selectData()
            launch(UI) {
                Toast.makeText([email protected], select, Toast.LENGTH_SHORT).show()
                Log.d("launch", select)
            }
        }
}

//封裝的介面重新整理方法,不用每次都呼叫launch(UI)了
fun launchUi(start: CoroutineStart = CoroutineStart.DEFAULT,
                 parent: Job? = null,
                 block: suspend CoroutineScope.() -> Unit) = launch(UI, start, parent, block)

如果你想讓 getData()的執行進行加鎖,直接這樣就可以:

fun getData()= runBlocking<Unit>{
......
}

如果你想在介面關閉時停止任務:

if (job.isActive)
   job.cancelAndJoin()

如果要在任務裡delay一會:

val launch = launch {
...
//等待執行,相當於Thread.sleep(1000)
delay(1000)
...
}

原理

1、構造引數:
既然coroutines這麼好用,那麼我們來看看它內部到底怎麼實現的:
開啟launch:

public actual fun launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    parent: Job? = null,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context, parent)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job =
    launch(context, start, block = block)

/**
 * @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
 */
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
    replaceWith = ReplaceWith("launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun launch(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> Unit): Job =
    launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)

我們會發現它有如上三種呼叫方式,預設我們會使用第一種方式。不管哪種方式都有一個DefaultDispatcher的引數,這個引數就是我們啟動非同步的上下文,繼續進入DefaultDispatcher:

@Suppress("PropertyName")
public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool

到這貌似看出一點端倪了,繼續進入CommonPool:

object CommonPool : CoroutineDispatcher() {
    private var usePrivatePool = false

    @Volatile
    private var _pool: Executor? = null

    private inline fun <T> Try(block: () -> T) = try { block() } catch (e: Throwable) { null }

    private fun createPool(): ExecutorService {
        if (System.getSecurityManager() != null) return createPlainPool()
        val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
            ?: return createPlainPool()
        if (!usePrivatePool) {
            Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
                ?.let { return it }
        }
        Try { fjpClass.getConstructor(Int::class.java).newInstance(defaultParallelism()) as? ExecutorService }
            ?. let { return it }
        return createPlainPool()
    }

    private fun createPlainPool(): ExecutorService {
        val threadId = AtomicInteger()
        return Executors.newFixedThreadPool(defaultParallelism()) {
            Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
        }
    }

    private fun defaultParallelism() = (Runtime.getRuntime().availableProcessors() - 1).coerceAtLeast(1)

    @Synchronized
    private fun getOrCreatePoolSync(): Executor =
        _pool ?: createPool().also { _pool = it }

    override fun dispatch(context: CoroutineContext, block: Runnable) =
        try { (_pool ?: getOrCreatePoolSync()).execute(timeSource.trackTask(block)) }
        catch (e: RejectedExecutionException) {
            timeSource.unTrackTask()
            DefaultExecutor.execute(block)
        }

    // used for tests
    @Synchronized
    internal fun usePrivatePool() {
        shutdown(0)
        usePrivatePool = true
        _pool = null
    }

    // used for tests
    @Synchronized
    internal fun shutdown(timeout: Long) {
        (_pool as? ExecutorService)?.apply {
            shutdown()
            if (timeout > 0)
                awaitTermination(timeout, TimeUnit.MILLISECONDS)
            shutdownNow().forEach { DefaultExecutor.execute(it) }
        }
        _pool = Executor { throw RejectedExecutionException("CommonPool was shutdown") }
    }

    // used for tests
    @Synchronized
    internal fun restore() {
        shutdown(0)
        usePrivatePool = false
        _pool = null
    }

    override fun toString(): String = "CommonPool"
}

到這已經非常清楚了,這是一個由ExecutorService構建的執行緒池,由上面的defaultParallelism()完成。
除此之外,我們看到了DefaultDispatcher繼承了一個CoroutineDispatcher的類:


public expect abstract class CoroutineDispatcher constructor() : AbstractCoroutineContextElement, ContinuationInterceptor {
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    public override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
}

public expect interface Runnable {
    public fun run()
}

@Suppress("PropertyName")
public expect val DefaultDispatcher: CoroutineDispatcher

開啟這個類我們發現就是一些要實現的抽象方法,沒啥別的了。

回到上面開始講的launch的構造方式中,除了CoroutineContext,還有第二個引數start,這個我們通過原始碼可以看出它是一個列舉類,它的意思是選擇協程啟動的模式:DEFAULT、LAZY、ATOMIC、UNDISPATCHED:

public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>) =
        when (this) {
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        }

引數三Job就是我們要構造的返回物件,這裡也就是任務的主要執行處。引數四CoroutineScope作用時任務執行外界可回撥的狀態、上下文物件。

2、任務的執行
構造引數的父類中通過接收子類傳遞的引數再次講引數封裝完整後,最後呼叫start()方式啟動:

val newContext = newCoroutineContext(context, parent)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)

我們進入start裡:

public abstract class AbstractCoroutine<in T>{
........
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        initParentJob()
        start(block, receiver, this)
    }
........
    }

繼續進入initParentJob:

public actual interface Job : CoroutineContext.Element {
......
internal actual fun initParentJobInternal(parent: Job?) {
        check(parentHandle == null)
        if (parent == null) {
            parentHandle = NonDisposableHandle
            return
        }
        parent.start() // make sure the parent is started
        @Suppress("DEPRECATION")
        val handle = parent.attachChild(this)
        parentHandle = handle
        // now check our state _after_ registering (see updateState order of actions)
        if (isCompleted) {
            handle.dispose()
            parentHandle = NonDisposableHandle // release it just in case, to aid GC
        }
    }
......
}

到這裡算是我們能跟蹤的都跟完了。以上是coroutine的整個執行流程,原始碼大概不算太複雜,內部的維護也算很周全,例如加鎖、取消、delay、多工併發它都做了處理,所以我們不妨在開發中嘗試一下。