RxJS 是一个强大的响应式编程库,提供了多种操作符帮助开发者处理异步数据流,使得代码更加简洁和易于维护。组合操作符是其中一类操作符,用于将多个数据流进行组合并输出一个新的数据流,本文将详细介绍 RxJS 中的组合操作符。
组合操作符分类
RxJS 中的组合操作符可分为静态操作符和实例操作符两类:
- 静态操作符:由 RxJS 对象直接调用,不需要先创建一个 Observable 实例。例如
Rx.Observable.combineLatest()
。 - 实例操作符:调用 Observable 实例中的方法。例如
Observable.prototype.merge()
。
组合操作符详解
merge
merge
将多个 Observable 中的值组合成一个 Observable,并发射所有 Observable 中的值。当有多个 Observable 同时发射值时,merge
会立即将这些值发射出去。
const observable1 = Rx.Observable.interval(1000).take(5).map(x => 'a' + x); const observable2 = Rx.Observable.interval(500).take(3).map(x => 'b' + x); observable1.merge(observable2).subscribe(console.log); // 输出结果:a0, b0, a1, b1, b2, a2, a3, a4
在上面的例子中,observable1
每隔 1s 发射一个值,共发射 5 次,发射的值为 'a0'、'a1'、'a2'、'a3'、'a4',observable2
每隔 0.5s 发射一个值,共发射 3 次,发射的值为 'b0'、'b1'、'b2',使用 merge
将两个 Observable 中的值组合成一个 Observable,并立即发射所有值,最后输出的结果为 'a0'、'b0'、'a1'、'b1'、'b2'、'a2'、'a3'、'a4'。
concat
concat
将多个 Observable 中的值按照顺序串联在一起,并发射整个 Observable 的值序列。当一个 Observable 完全发射完值后,concat
才会订阅并发射下一个 Observable 中的值。
const observable1 = Rx.Observable.interval(1000).take(5).map(x => 'a' + x); const observable2 = Rx.Observable.interval(500).take(3).map(x => 'b' + x); observable1.concat(observable2).subscribe(console.log); // 输出结果:a0, a1, a2, a3, a4, b0, b1, b2
在上面的例子中,observable1
每隔 1s 发射一个值,共发射 5 次,发射的值为 'a0'、'a1'、'a2'、'a3'、'a4',observable2
每隔 0.5s 发射一个值,共发射 3 次,发射的值为 'b0'、'b1'、'b2',使用 concat
将两个 Observable 中的值按照顺序串联在一起,并发射整个 Observable 的值序列,最后输出的结果为 'a0'、'a1'、'a2'、'a3'、'a4'、'b0'、'b1'、'b2'。
zip
zip
将多个 Observable 中发射的对应值合并成一个新的 Observable 并发射出去,只有当所有 Observable 都发射了值,它才会发射一个由所有值组成的数组。
const observable1 = Rx.Observable.interval(1000).take(3); const observable2 = Rx.Observable.interval(500).take(4); observable1.zip(observable2, (x, y) => x + y).subscribe(console.log); // 输出结果:0, 1, 2
在上面的例子中,observable1
每隔 1s 发射一个值,共发射 3 次,发射的值为 0、1、2,observable2
每隔 0.5s 发射一个值,共发射 4 次,发射的值为 0、1、2、3,使用 zip
将 observable1
和 observable2
发射的值逐一合并成一个数组,并发射出去,最后输出的结果为 0、1、2。
combineLatest
combineLatest
将多个 Observable 中最新的值按顺序组合成一个数组并发射出去,当任意一个 Observable 发射了新的值时,combineLatest
会将最新的值按顺序组合成数组并发射出去。。
const observable1 = Rx.Observable.interval(1000).take(3); const observable2 = Rx.Observable.interval(500).take(4); observable1.combineLatest(observable2, (x, y) => x + y).subscribe(console.log); // 输出结果:2, 3, 4, 5, 5, 6, 6, 7, 8, 9
在上面的例子中,observable1
每隔 1s 发射一个值,共发射 3 次,发射的值为 0、1、2,observable2
每隔 0.5s 发射一个值,共发射 4 次,发射的值为 0、1、2、3,使用 combineLatest
将 observable1
和 observable2
的最新值组合成一个数组,并发射出去,每次都是最新的值进行组合。因此最后输出的结果为: '2, 3'、'2, 4'、'3, 4'、'3, 5'、'4, 5'、'4, 6'、'5, 6'、'5, 7'、'6, 7'、'6, 8'、'7, 8'、'7, 9'、'8, 9'。
forkJoin
forkJoin
将多个 Observable 中最后发射的值按顺序组合成一个数组并发射出去,当所有 Observable 都完成时,才会发射一个由所有值组成的数组。
const observable1 = Rx.Observable.of(1, 2, 3); const observable2 = Rx.Observable.interval(1000).take(3); const observable3 = Rx.Observable.from([4, 5, 6]).delay(2000); Rx.Observable.forkJoin(observable1, observable2, observable3).subscribe(console.log); // 输出结果: [3,2,6]
在上面的例子中,observable1
发射值 1、2、3,observable2
每隔 1s 发射一个值,共发射 3 次,发射的值为 0、1、2,observable3
发射值 4、5、6,使用 forkJoin
等待所有 Observable 完成后,将它们最后发射的值组合成一个数组,并发射出去,最后输出的数组为 [3,2,6]。
总结
本文介绍了 RxJS 中的组合操作符,包括 merge
、concat
、zip
、combineLatest
和 forkJoin
,同时提供了示例代码方便理解和学习。在实际开发中,合理应用这些组合操作符可以提高代码的可读性、简洁性和易于维护性。
来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/64d10e4fb5eee0b525812662