RxJS的另外四種實現方式(六)——使用Stream類實現
該實現方式與之前幾種不同的,該實現方式僅針對Nodejs環境。在Nodejs環境中,提供了Stream類,包括Readable、Transform、Writeable等子類都是可擴充套件的。從字面上看,正好對應Rx中的生產者、傳遞者、消費者。
實現該庫的起因是,一次在Nodejs中需要在koa框架裡面提供event-stream功能,目前除了IE瀏覽器外其他瀏覽器都支援了服務端事件推送,這個功能可以很好的代替輪詢。webpack用的熱更新就是通過這個功能實現的。
言歸正傳,首先得實現生產者,我們先來看interval
class Interval extends Readable {
constructor (period) {
super({ objectMode: true })
this.period = period
this.i = 0
}
_read(size) {
setTimeout(() => this.push(this.i++), this.period)
}
}
exports.interval = period => new Interval(period)
說明一下,建構函式傳入objectMode:true的物件是讓stream處於物件模式,而不是二進位制流模式。_read函式必須覆蓋父類,否則出錯,當有訂閱者連線上來後,就會呼叫_read方法。我們在這個方法裡面傳送資料,即呼叫push方法,將資料傳送給流的接收者。
當呼叫過push方法後,後面的接收者如果呼叫了callback回撥,則表示資料消費完畢,會再次呼叫_read方法,直到push(null)表示生產者已經complete
FromArray也十分簡單易讀
class FromArray extends Readable {
constructor(array) {
super({ objectMode: true })
this.array = array
this.pos = 0
this.size = array.length
}
_read(size) {
if (this.pos < this.size) {
this.push(this.array[this.pos++])
} else
this.push(null)
}
}
exports.fromArray = array => new FromArray(array)
下面要實現一個轉換器(操作符)Filter
class Filter extends Transform {
constructor(f) {
super({ readableObjectMode: true, writableObjectMode: true })
this.f = f
}
_transform(data, encoding, callback) {
const f = this.f
if (f(data)) {
this.push(data);
}
callback();
}
_flush(callback) {
callback()
}
}
exports.filter = f => new Filter(f)
這時候我們需要覆蓋_transform、_flush函式,同樣的,push方法會讓資料流到下面的流中,而callback回撥會使得上一個流繼續傳送資料。
最後我們來實現Subscriber
class Subscriber extends Writable {
constructor(n, e, c) {
super({ objectMode: true })
this.n = n
this.e = e
this.c = c
}
_write(chunk, encoding, callback) {
this.n(chunk)
callback(null)
}
_final(callback) {
this.c()
callback()
}
}
exports.subscribe = (n, e = noop, c = noop) => new Subscriber(n, e, c)
Subscriber是一個可寫流,我們必須覆蓋_write方法用於消費資料,_final方法用於complete事件處理。這裡沒有實現error事件。有興趣的同學可以思考如何實現。
最後我們需要把各種stream串起來,變成一個長長的水管
exports.pipe = pipeline || ((first, ...cbs) => cbs.reduce((aac, c) => aac.pipe(c), first));
高版本的Nodejs已經提供了pipeline方法,可以直接使用,低版本的話,可以用上面的方法進行連線。
至此,我們已經使用Nodejs提供的Stream類實現了Rx的基本邏輯。(完)