1. 程式人生 > >RxJS的另外四種實現方式(三)——性能最高的庫

RxJS的另外四種實現方式(三)——性能最高的庫

如何 www table fas set export llb const events

接上篇 RxJS的另外四種實現方式(二)——代碼最小的庫(續)

代碼最小的庫rx4rx-lite雖然在性能測試中超過了callbag,但和most庫較量的時候卻落敗了,於是我下載了most庫,要解開most庫性能高的原因。
我們先上一組測試數據,這是在我的windows10 上面跑的

dataflow for 1000000 source events

lib op/s samples
rx4rx-lite 11.29 op/s ± 1.47% (56 samples)
rx4rx-fast 22.56 op/s ± 1.77% (57 samples)
cb-basics 9.56 op/s ± 1.73% (49 samples)
xstream 5.37 op/s ± 0.68% (30 samples)
most 17.32 op/s ± 1.93% (82 samples)
rx 6 6.28 op/s ± 3.10% (35 samples)

經過我的不懈努力終於把性能超過了most庫。
我先介紹一下fast庫的工作原理,下一篇文章我再介紹如何從most庫中找到性能提升的要領。

在fast庫中,我們開始使用一個基類作為一切操作符的父類,名為Sink。

class Sink {
    constructor(sink, ...args) {
        this.defers = new Set()//用於存放需要釋放的操作
        this.sink = sink
        this.init(...args)
        if (sink) sink.defers.add(this)//用於釋放的連鎖反應
    }
    init() {

    }
    //是否連鎖釋放
    set disposePass(value) {
        if (!this.sink) return
        if (value)
            this.sink.defers.add(this)
        else this.sink.defers.delete(this)
    }
    //數據向下傳遞
    next(data) {
        this.sink && this.sink.next(data)
    }
    //完成/error事件向下傳遞
    complete(err) {
        this.sink && this.sink.complete(err)
        this.dispose(false)
    }
    error(err) {
        this.complete(err)
    }
    //釋放即取消訂閱功能
    dispose(defer = true) {
        this.disposed = true
        this.complete = noop
        this.next = noop
        this.dispose = noop
        this.subscribes = this.subscribe = noop
        defer && this.defer() //銷毀時終止事件源
    }
    defer(add) {
        if (add) {
            this.defers.add(add)
        } else {
            this.defers.forEach(defer => {
                switch (true) {
                    case defer.dispose != void 0:
                        defer.dispose()
                        break;
                    case typeof defer == ‘function‘:
                        defer()
                        break
                    case defer.length > 0:
                        let [f, thisArg, ...args] = defer
                        if (f.call)
                            f.call(thisArg, ...args)
                        else f(...args)
                        break
                }
            })
            this.defers.clear()
        }
    }
    subscribe(source) {
        source(this)
        return this
    }
    subscribes(sources) {
        sources.forEach(source => source(this))
    }
}

為了性能,代碼量稍微有點多了。原本傳入next和complete函數,現在變為傳入sink對象,這裏十分類似向Observable傳入Observer對象。但是與rxjs不同的是,我們的Observable仍然是一個函數,我們看一個從數組構造Observable的代碼

exports.fromArray = array => sink => {
    sink.pos = 0
    const l = array.length
    while (sink.pos < l && !sink.disposed) 
        sink.next(array[sink.pos++])
    sink.complete()
}

這個pos為什麽不直接定義一個變量呢?let pos = 0這是常規做法,這裏把變量定義到了對象的屬性上面,純粹是為了提高一點點性能,經過測試發現,直接訪問(讀寫操作)局部變量,比訪問對象的屬性要慢一些。

由於大部分的操作符都是相同的調用方式,所以可以抽象成一個函數

exports.deliver = Class => (...args) => source => sink => source(new Class(sink, ...args))

take操作符就變成了這樣

class Take extends Sink {
    init(count) {
        this.count = count
    }
    next(data) {
        this.sink.next(data)
        if (--this.count === 0) {
            this.defer()
            this.complete()
        }
    }
}
exports.take = deliver(Take)

而我們的subscriber就變成了這樣

exports.subscribe = (n, e = noop, c = noop) => source => {
        const sink = new Sink()
        sink.next = n
        sink.complete = err => err ? e(err) : c()
        source(sink)
        return sink
    }

至此fast庫的基本構建邏輯已經展示完畢。
至於為什麽這麽快,就請聽下回分解。
(未完待續)

RxJS的另外四種實現方式(三)——性能最高的庫