RxJS 中使用 combineLatest 操作符实现多数据源联合查询

在前端开发中,经常需要联合多个数据源进行查询和处理。传统的方式是通过 Promise.all() 或者 async/await 实现异步并行请求,然后在处理函数中合并数据。但是这种方式存在一些问题:代码复杂度较高,容易出现回调地狱,同时数据源增加时,处理逻辑变得更加复杂。RxJS 的 combineLatest 操作符可以很好地解决这些问题,同时提供了更强大、可组合、可读性更高的功能。

RxJS 简介

RxJS 是 ReactiveX 在 JavaScript 中的实现,它是一种基于观察者模式(Observer Pattern)的编程范式。RxJS 提供了一些常见的操作符,从简单的映射、过滤到更加复杂的操作如合并数据流、错误处理等。RxJS 有以下特点:

  • 响应式编程:RxJS 提供了响应式编程集成方案,便于管理、组合和筛选多个异步数据流。
  • 非阻塞式:RxJS 的所有操作都是在非阻塞式的背景下运行的,可以轻松地添加或删除控件。
  • 基于算子(operator)的语法:RxJS 提供了很多算子,可以轻松处理和管理多个异步数据流。

combineLatest 操作符

combineLatest 操作符比较常见,它可以在多个数据源中发出数据时将其合并在一起。它会订阅所有数据源,并当所有数据源都至少发出了一次数据时发出组合数据。每次数据源中任何一个数据发生更改时,combineLatest 都会重新计算数据并发出更新后的合并数据。

combineLatest 的语法

RxJS 中,combineLatest 操作符的语法如下:

combineLatest<T>(...observables: Array<ObservableInput<T>>): Observable<T[]>;

combineLatest 接收一个可变数量的 Observable(可被订阅数据源)作为参数,并返回一个 Observable,它可以在所有数据源至少发出一次数据时将数据合并成一个数组,并在有新数据时重新发出更新后的结果。

combineLatest 的示例

// 假设我们有两个数据源
const numbers$ = of(1, 2, 3, 4, 5);
const letters$ = of('a', 'b', 'c', 'd');

// 使用 combineLatest 将它们合并在一起
const combined$ = combineLatest(numbers$, letters$);

// 订阅结果
combined$.subscribe(([numbers, letters]) => {
  console.log(numbers, letters);
});

// 输出
// [ 5, 'd' ]

在这个例子中,我们将一个发出数字序列的 Observable 和一个发出字母序列的 Observable 合并在一起,并在两者中至少发出一次数据时打印输出结果 [5, 'd']

RxJS 中使用 combineLatest 实现多数据源联合查询

使用 RxJS 中的 combineLatest 操作符可以方便地联合多个数据源进行查询和处理。假设我们需要从接口 A 和接口 B 获取数据,并将它们合并在一起。

先创建两个异步请求

我们先把两个异步请求分别封装成 Observable:

// 获取数据的异步请求函数
function getDataA() {
  return of({x: 11, y: 22}).pipe(delay(500));
}

function getDataB() {
  return of({z: 33}).pipe(delay(1000));
}

// 将异步请求转为 Observable
const sourceA$ = from(getDataA());
const sourceB$ = from(getDataB());

代码中,我们使用了 of()from() 操作符分别将异步请求响应值(promise、xhr)转换为 Observable。

使用 combineLatest 合并数据源

// 在 sourceA$ 和 sourceB$ 上使用 combineLatest 操作符
const combined$ = combineLatest(sourceA$, sourceB$);

我们使用了 combineLatest() 将两个 Observable 分别作为参数传递,这样它们就会合并到一起并通过计算发出更新后的结果。

处理合并后的数据

// 在订阅者中获取合并后的数据
combined$.subscribe(([dataA, dataB]) => {
  console.log('Got combined data:', {dataA, dataB});
});

经过 combineLatest 合并后,订阅者会接收到一个数组,包含了在 sourceA$ 和 sourceB$ 上发射的最近数据。我们可以从数组中获取数据并进行进一步的处理。

完整的示例代码

import {combineLatest, from, of} from 'rxjs';
import {delay} from 'rxjs/operators';

// 获取数据的异步请求函数
function getDataA() {
  return of({x: 11, y: 22}).pipe(delay(500));
}

function getDataB() {
  return of({z: 33}).pipe(delay(1000));
}

// 将异步请求转为 Observable
const sourceA$ = from(getDataA());
const sourceB$ = from(getDataB());

// 在 sourceA$ 和 sourceB$ 上使用 combineLatest 操作符
const combined$ = combineLatest(sourceA$, sourceB$);

// 在订阅者中获取合并后的数据
combined$.subscribe(([dataA, dataB]) => {
  console.log('Got combined data:', {dataA, dataB});
});

总结

RxJS 的 combineLatest 操作符提供了一种方便的方式来同时查询多个数据源。由于它是基于响应式编程范式构建的,所以我们可以更容易地组合和设计我们的异步操作。

使用 combineLatest 可以减少代码复杂性和深度嵌套,同时它是可组合、可读性高的,并且易于扩展。因此,如果你使用 RxJS,我强烈建议你在需要联合多个数据源时优先考虑使用它。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65b4c9edadd4f0e0ffda4ed7