1. 程式人生 > >RxSwift 中的排程器

RxSwift 中的排程器

與 ReactiveCocoa 相比,Rx 的一大優勢就是更豐富的併發模型。提到併發,就不得不提多執行緒。在 RxSwift 中,與執行緒對應的概念就是排程器,本文就排程器做些介紹,包括併發排程器、序列排程器、RxSwift 內建的排程器,及自定義排程器。

排程器抽象出了執行工作的機制,可以不怎麼準確的認為排程器對應原本的執行緒。

observeOnsubscribeOn 這兩個操作符可以與排程器配合使用。

如果你想在一個不同的排程器執行工作,那就使用 observeOn(scheduler) 操作符。

如果沒有顯式指定 observerOn ,那麼工作會在產生元素的排程器來執行。

下邊是一個使用 observeOn

的例子:

sequence1
  .observeOn(backgroundScheduler)
  .map { n in
      print("This is performed on the background scheduler")
  }
  .observeOn(MainScheduler.instance)
  .map { n in
      print("This is performed on the main scheduler")
  }

如果想要序列在指定排程器上生成(subscribe 方法)和 dispose,可以使用 subscribeOn(scheduler)

如果沒有明確指定 subscribeOn,那麼將在呼叫 subscribe(onNext:)subscribe 的同一排程器上呼叫 subscribe 閉包(傳遞給 Observable.create 的閉包)。

如果沒有明確指定 subscribeOn,那麼將在啟動 disposing 的同一排程器上呼叫 dispose 方法。

簡而言之,如果沒有顯式的選擇排程器,那麼將在當前排程器上呼叫這些方法。

序列排程器 vs 併發排程器

由於排程器可以是任何東西,並且所有轉換序列的操作副都需要有額外的保證,因此你建立的是哪種排程器非常重要。

這裡的保證是指,對所有的序列(Observable)而言,不論它在那個執行緒上產生元素,如果序列通過 observer.ron(.next(nextElement))

將一個元素髮送給觀察者,那麼序列在 observer.on 方法執行結束前不能傳送下一個元素。

如果 .next 方法沒有執行完成,序列也不能傳送終止命令,如 .completed.error

如果排程器是併發的,Rx 的 observeOnsubscribeOn 操作符將確保一切正常。

如果你使用Rx可以證明是序列的排程器,它能夠執行額外的優化。

在序列排程器的情況下, observeOn 被優化為一個簡單的 dispatch_async 呼叫。

自定義排程器

除了當前的排程器,你也可以實現自己的排程器。

如果你想要描述立即執行工作的排程器,可以實現 ImmediateScheduler 協議。

public protocol ImmediateScheduler {
    func schedule<StateType>(state: StateType, action: (/*ImmediateScheduler,*/ StateType) -> RxResult<Disposable>) -> RxResult<Disposable>
}

如果你想要支援基於事件的操作,那麼你可以實現 Scheduler 協議:

public protocol Scheduler: ImmediateScheduler {
    associatedtype TimeInterval
    associatedtype Time

    var now : Time {
        get
    }

    func scheduleRelative<StateType>(state: StateType, dueTime: TimeInterval, action: (StateType) -> RxResult<Disposable>) -> RxResult<Disposable>
}

如果你想有周期執行的能力,你可以通過實現 PeriodicScheduler 協議來通知 Rx。

public protocol PeriodicScheduler : Scheduler {
    func schedulePeriodic<StateType>(state: StateType, startAfter: TimeInterval, period: TimeInterval, action: (StateType) -> StateType) -> RxResult<Disposable>
}

內建的排程器

上邊我們提到 Rx 可以使用所有型別的排程器,但如果 Rx 可以證明排程器是序列的,那麼會執行額外的優化

  1. CurrentThreadScheduler:它是一個序列排程器,表示當前執行工作的排程器。
  2. MainScheduler:抽象了需要在主執行緒執行的工作,當然也是一個序列排程器。
  3. SerialDispatchQueueScheduler:是一個序列排程器,抽象需要在一個 dispatch_queue_t 上執行的工作,它將確保即時傳遞併發排程佇列(concurrent dispatch queue),它也會轉換為序列佇列。主排程器 也是 SerialDispatchQueueScheduler 的一個示例。
  4. ConcurrentDispatchQueueScheduler:是一個併發排程器,抽象需要在一個 dispatch_queue_t 上執行的工作。當一些任務需要在後臺執行時,使用這個排程器。
  5. OperationQueueScheduler:抽象需要在 NSOperationQueue 上執行的工作。適用於需要在後臺執行大量工作,並希望通過使用 maxConcurrentOpeartionCount 來調整併發處理的情況。

本文中的部分表示 與 Rx 中的對照:

本文中的表示 Rx中的表示
排程器 scheduler
序列排程器 serial scheduler
併發排程器 concurrent scheduler