RxJS 深入浅出教程:RxJS 的基本概念和操作符

关于 RxJS

RxJS 是一个基于流(Stream)和响应式编程(Reactive Programming)理念的库,用于处理异步事件和数据流的JavaScript 库。它的核心思想是将异步事件作为数据流进行处理,并使用强大的操作符来处理数据流。

RxJS 可以使用于各种 JavaScript 环境,包括 Web 网页应用程序、Node.js 应用程序等。

基本概念

Observable 可观察对象

RxJS 的核心就是 Observable 可观察对象。Observable 总是存在并等待事件的到来。事件可以是异步的,如网络请求、按钮点击等。

Observable 可以通过注册观察者(Observer)的方式对这些事件进行监听。

// 创建一个 Observable 对象
const observable = Rx.Observable.create((observer) => {
  observer.next(1);
  observer.next(2);
  observer.next(3);
  observer.complete();
});

// 注册观察者
observable.subscribe({
  next: (value) => console.log(value),
  complete: () => console.log('completed.')
});
// Output: 1 2 3 completed.

上面的代码中创建了一个 Observable 对象,其中使用了 Observable.create() 方法来创建 Observable。在这个 Observable 对象中我们通过 observer.next() 方法发出了三个事件值,然后使用 observer.complete() 表示所有事件已经发送完成。

最后我们使用 subscribe() 方法注册了一个观察者,subscribe() 方法接收一个包含不同回调函数的对象作为参数,其中 next() 方法会在每个事件值被触发时调用,而 complete() 方法会在所有事件值已被触发后调用。

Observer 观察者

Observer 观察者是一个对象,它包含一系列的回调函数,能够对 Observable 对象发出的事件进行监听。

const observer = {
  next: (value) => console.log(`received value: ${value}`),
  error: (error) => console.log(`error: ${error}`),
  complete: () => console.log('completed.')
};

Subscription 订阅

Subscription 订阅表示 Observable 对象与观察者之间的连接。当订阅被取消时,观察者将停止接收来自 Observable 对象的事件。

const observable = Rx.Observable.interval(1000);
const subscription = observable.subscribe(console.log);

setTimeout(() => {
  subscription.unsubscribe();
}, 5000);

上面的代码中,我们创建了一个 Observable 对象,使用了 Rx.Observable.interval() 方法来每隔 1 秒发出一个自增数字事件值,并将该 Observable 的事件值注册给观察者 console.log。我们使用 setTimeout() 方法在 5 秒后取消订阅。这意味着观察者将不再接收来自 Observable 的事件值。

Subject 主题

Subject 主题是一种特殊的 Observable,它允许我们在多个观察者之间共享同一份数据,并同时触发多个事件值。

const subject = new Rx.Subject();

subject.subscribe({ next: (value) => console.log(`Observer A: ${value}`) });

subject.next('Hello');

subject.subscribe({ next: (value) => console.log(`Observer B: ${value}`) });

subject.next('RxJS');
// Output: Observer A: Hello Observer B: Hello Observer A: RxJS Observer B: RxJS

上面的代码中,我们先创建了一个 Subject 主题,并注册了两个观察者。我们然后可以通过 next() 方法向主题发送事件值,每个观察者都会收到该事件值。

操作符

RxJS 的操作符是对 Observable 的变换的函数。通过组合操作符以创建应用程序所需的复杂行为。

map 映射

map() 操作符对 Observable 中的每个事件值进行映射,返回一个新的 Observable,其中包含处理后的新事件值。

const observable = Rx.Observable.from([1, 2, 3]);

observable
  .map((x) => x * 2)
  .subscribe(console.log);
// Output: 2 4 6

上面的代码中,我们创建了一个 Observable,使用了 Rx.Observable.from() 方法将一个数组转化为 Observable,然后使用 map() 操作符将每个事件值乘以 2,最后将处理后的新事件值注册给 console.log 进行输出。

filter 过滤

filter() 操作符用于仅仅向下传递那些满足某些条件的事件值,返回一个新的 Observable 包含过滤后的事件值。

const observable = Rx.Observable.from([1, 2, 3, 4, 5]);

observable
  .filter((x) => x % 2 === 0)
  .subscribe(console.log);
// Output: 2 4

上面的代码中,我们创建了一个 Observable,使用了 Rx.Observable.from() 方法将一个数组转化为 Observable,然后使用 filter() 操作符将只让偶数通过过滤,最后将处理后的新事件值注册给 console.log 进行输出。

merge 合并

merge() 操作符将多个 Observable 合并为一个 Observable,并将它们的事件值按照时间发生先后顺序逐一合并起来。

const observableA = Rx.Observable.interval(1000).map((x) => `observableA ${x}`);
const observableB = Rx.Observable.interval(2000).map((x) => `observableB ${x}`);

Rx.Observable.merge(observableA, observableB)
  .subscribe(console.log);
// Output: observableA 0 observableB 0 observableA 1 observableA 2 observableB 1 observableA 3 observableB 2 observableA 4 observableA 5 observableB 3 observableA ...

上面的代码中,我们创建了两个 Observable,一个每隔 1 秒发出一个事件值,一个每隔 2 秒发出一个事件值。我们使用 Rx.Observable.merge() 方法将它们进行合并,最后将所有处理后的事件值注册给 console.log 进行输出。

总结

本文介绍了 RxJS 的基本概念和操作符。Observable 可观察对象是 RxJS 的核心概念,允许观察者注册对事件值进行监听。操作符用于对 Observable 中的事件值进行处理和转换,可以通过组合操作符创建出各种复杂行为的应用程序。使用 RxJS 开发应用程序可以使开发者更轻松地处理异步事件和数据流。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65a34553add4f0e0ffb62802


纠错反馈