RxJS 中的错误处理:使用 catchError 和 retryWhen 操作符

RxJS 是一个强大的 JavaScript 库,用于异步编程。它使我们能够轻松地处理 DOM 事件、Xhr 请求、WebSocket 和其他种类的异步操作。但是,由于异步操作的不可预知性和不确定性,我们需要一种方法来处理和管理这些操作中可能出现的错误。这就是 RxJS 中的错误处理。

RxJS 提供了两个主要的错误处理操作符:catchError 和 retryWhen。这些操作符让您能够在 RxJS 流中对错误进行更细粒度的处理和解决方案。

catchError 操作符

catchError 操作符用于捕获和处理 RxJS 流中的错误。如果您没有为流中的错误提供明确定义的处理方法,错误会沿着流传递并且最终到达“全局”错误处理程序。由此导致的问题是,您失去了对错误的控制,而无法采取适当的措施来解决它们。

catchError 操作符解决了这个问题。它允许您针对流中发生的每个错误提供一个自定义的错误处理程序。您可以使用 catchError 操作符处理单个错误,复杂和多级错误处理都是可能的。

下面是 catchError 操作符的语法:

observable.pipe(catchError((error) => {
  // 在这里处理错误
  return someObservable;
}));

在这种情况下,当发生错误时会执行 catchError 函数内的回调函数。该函数可以根据错误类型、错误代码或错误消息将流重定向到不同的流中,然后可以在新的流中重新启用或提供新的流或数据。

catchError 操作符示例

import { of } from 'rxjs';
import { catchError } from 'rxjs/operators';

const throwError = () => {
  throw new Error('Something went wrong');
};

const source$ = of('A', 'B', throwError(), 'C', 'D');

source$
  .pipe(
    catchError((error) => {
      console.warn(error);
      return of('X', 'Y', 'Z');
    })
  )
  .subscribe((value) => console.log(value));

在此示例中,我们使用 of 操作符创建了一个执行 A、B、throwError()、C 和 D 操作的 observable。throwError() 操作故意没有提供实际的实现,并抛出了一个错误。

我们通过在源 observable 上执行 catchError 操作,并在 catchError 中记录错误和提供一个新的 observable,对这个错误进行了处理。在浏览器控制台中,会打印出错误消息“Something went wrong”,然后源 observable 的值流将被重定向并用 X、Y、Z 替换。在最终的订阅中,我们使用 console.log 打印出发出的值。

retryWhen 操作符

retryWhen 操作符是另一个有用的错误处理方法,它对同一错误的重试提供了细粒度的控制。它让您可以指定在错误发生时进行重试的条件和策略,从而更好地控制流的行为。

retryWhen 操作符将源 observable 转换为一个新的 observable,该 observable 在过了一定的时间后,重新订阅源 observable。在这个新的订阅中,如果源 observable 再次发出错误,retryWhen 可以在满足条件时重新订阅源 observable。

下面是 retryWhen 操作符的语法:

observable.pipe(retryWhen((errors) => {
  // 在这里处理错误
  return someObservable;
}));

这里传递给 retryWhen 函数的参数 errors 是一个错误 observable,它接收来自 retryWhen 下游 observable 中的错误。在 retryWhen 中,错误 observable 的行为非常类似 catchError 中的 observable,重新订阅源 observable 后可以提供一个流或数据。

retryWhen 操作符示例

import { interval, timer } from 'rxjs';
import { retryWhen, mergeMap } from 'rxjs/operators';

const source$ = interval(1000);

source$
  .pipe(
    mergeMap((value) => {
      if (value > 5) {
        return timer(4000); // 在一段时间后发生错误
      }
      return of(value);
    }),
    retryWhen((errors) =>
      errors.pipe(
        mergeMap((error, index) =>
          index < 2 ? of(error) : throwError('stopping retry')
        )
      )
    )
  )
  .subscribe({
    next: (value) => console.log(value),
    error: (error) => console.error(error),
  });

在这个示例中,我们用 4 秒后的 timer 替换了 value 大于 5 的值。retryWhen 下游 observable 在接收到错误时会重新订阅源 observable,但是该示例只对前两个重试尝试进行了重试。在第三次重试之后,我们使用 throwError 操作停止了源 observable。最终,在订阅中,我们响应源 observable 的错误,并打印出错误消息。

总结

在 RxJS 中使用 catchError 和 retryWhen 操作符时,可以捕获和处理异步流中可能出现的错误。这些操作符可以让您将错误处理颗粒度提高到一个新高度,以便更好地控制和管理错误。

使用 catchError 和 retryWhen 操作符的关键在于提供适当的错误处理程序和策略。这样可以确保您在出现错误时可以采取适当的措施,以便优化流的行为以适应您的应用程序的需要。

来源:JavaScript中文网 ,转载请注明来源 本文地址:https://www.javascriptcn.com/post/65922035eb4cecbf2d706f64


纠错反馈