RxJS 中的 Subject 源码分析

阅读时长 10 分钟读完

Subject 是 RxJS 的重要概念之一,它允许 Observable 与 Observer 之间进行交互。事实上,Subject 本身既充当 Observable,又充当 Observer。本文将对 RxJS 中的 Subject 进行深入分析,并给出相应的示例代码,以帮助读者更好地理解和使用 Subject。

Subject 的基本概念

Subject 是一个可观察对象(Observable),同时也是一个观察者(Observer)。它可以订阅(subscribe)其他 Observable,并且可以将其收到的数据推送给它自己订阅的观察者。Subject 实例化后,即可被认为是一个事件流(Event Stream),它会接收来自各种其他 Observable 的数据,并按照特定的策略将这些数据传递给自己的订阅者(Subscribers)。

例如下面的代码示例中,我们通过 Subject 的 next 方法推送了一些数据,并用自己的 subscribe 方法订阅了这个 Subject 对象:

-- -------------------- ---- -------
------ - ------- - ---- -------

----- ------- - --- ------------------

-------------------
  ----- ----- -- ------------------------- -- --------
---

---------------- -- ------------ -- -

-------------------
  ----- ----- -- ------------------------- -- --------
---

---------------- -- ------------ -- -- ------------ -- -

上面的代码中,我们创建了一个名为 subject 的 Subject 对象。我们使用 subject.subscribe 方法注册了两个观察者,分别是 Subscription A 和 Subscription B。然后我们使用 subject.next(1) 方法将数值 1 推送到了 Subject 中,由于 subject 是一个 Subject 而不是普通的 Observable,所以这个数值会分别被推送到 A 和 B 的观察者中。

接着,我们再次使用 subject.next(2) 方法将数值 2 推送到了 Subject 中。这一次,由于 Subject 已经有了两个观察者,所以两个观察者都能够接收到数值 2。这就是 Subject 的基本用法。

Subject 的类型

Subject 有几种不同的类型,每种类型都有着不同的表现方式和应用场景。下面是 RxJS 中常用的几种 Subject 类型:

BehaviorSubject

BehaviorSubject 是一种特殊的 Subject,它会对新的订阅者发出当前值的最新版本。例如下面的代码示例中,我们创建了一个名为 subject 的 BehaviorSubject 对象,并在初始化时将其值设置为 0。我们使用 subject.subscribe 方法注册了两个观察者:

-- -------------------- ---- -------
------ - --------------- - ---- -------

----- ------- - --- -------------------

-------------------
  ----- ----- -- ------------------------- -- --------
---

---------------- -- ------------ -- -

-------------------
  ----- ----- -- ------------------------- -- --------
---

---------------- -- ------------ -- -- ------------ -- -

由于 BehaviorSubject 会对新的订阅者发出当前值的最新版本,所以 Subscription B 在注册后立即收到了数值 1。这就是 BehaviorSubject 的特殊之处。

ReplaySubject

ReplaySubject 是一种会记录并回放所有数据的 Subject。例如下面的代码示例中,我们创建了一个名为 subject 的 ReplaySubject 对象,并在参数中指定了缓存区的大小为 2。我们使用 subject.subscribe 方法注册了两个观察者:

-- -------------------- ---- -------
------ - ------------- - ---- -------

----- ------- - --- -----------------

-------------------
  ----- ----- -- ------------------------- -- --------
---

----------------

-------------------
  ----- ----- -- ------------------------- -- --------
---

----------------
----------------

-- ------- ------------ -- -- ------------ -- -- ------------ -- -- ------------ -- -

由于我们将 ReplaySubject 的缓存区大小设为了 2,因此它只会保留 2 个最新的数据,即数值 2 和数值 3。当 Subscription B 注册后,它会首先收到缓存区中的数值 2 和数值 3,然后依次收到后续的所有值。这就是 ReplaySubject 的特殊之处。

AsyncSubject

AsyncSubject 是一种特殊的 Subject,它只会在数据流结束之后,再将最后一个值发送给订阅者。例如下面的代码示例中,我们创建了一个名为 subject 的 AsyncSubject 对象,并在完成前通过 subject.next 方法推送了数值 1 和 2。我们使用 subject.subscribe 方法注册了一个观察者:

-- -------------------- ---- -------
------ - ------------ - ---- -------

----- ------- - --- ---------------

----------------
----------------
-------------------

-------------------
  ----- ----- -- ------------------------- -- --------
---

-- ------- ------------ -- -

由于 AsyncSubject 只会在数据流结束之后,再将最后一个值发送给订阅者,因此 Subscription A 只接收到了数值 2。这就是 AsyncSubject 的特殊之处。

Subject 的源码分析

从源码的角度来看,Subject 其实是一个继承了 Observable 和 Observer 两个接口的类。下面是 Subject 的定义:

实例化

当我们创建一个 Subject 实例时,其内部会调用父类 Observable 的构造函数来完成初始化。具体而言,Subject 会创建一个名为 observers 的属性,并将其初始化为空数组。这个 observers 数组是用来保存所有注册过的观察者的,每个观察者都是一个 Observer 对象。

订阅

当我们使用 subscribe 方法订阅一个 Subject 对象时,Subject 会将这个订阅者保存到 observers 数组中,并返回一个名为 Subscription 的对象,用来表示这个订阅者。

-- -------------------- ---- -------
------ ----- ---------- ------- ------------- ---------- ---------------- -
  -- ---

  -------------------- -------------------- ------------ -
    ----- ------------ - --------------------------
    -------------------------------------------
    ------ -------------
  -
-

需要注意的是,Subject 的 subscribe 方法并没有实现 Observable 的 subscribe 方法,而是调用了父类 Observable 的 subscribe 方法。这是因为 Subject 在订阅时需要保存订阅者对象,所以需要使用自己独有的方式来处理订阅。

推送数据

当我们使用 Subject 的 next 方法推送一些数据时,Subject 会依次遍历 observers 数组中的所有观察者,并将数据推送给它们。具体而言,Subject 会调用数组中每个观察者的 next 方法来推送数据给它们。

-- -------------------- ---- -------
------ ----- ---------- ------- ------------- ---------- ---------------- -
  -- ---

  ------------ --- ---- -
    -- ------------- -
      ----- --- --------------------------
    -

    ----- - --------- - - -----
    ----- --- - -----------------
    ----- ---- - ------------------

    --- ---- - - -- - - ---- ---- -
      ---------------------
    -
  -
-

需要注意的是,Subject 推送数据时是按照订阅的顺序来推送的。也就是说,先订阅的观察者先收到数据,后订阅的观察者后收到数据。

销毁

当我们使用 Subject 的 unsubscribe 方法来取消订阅时,Subject 会遍历 observers 数组中的所有观察者,并调用它们的 unsubscribe 方法来完成取消订阅操作。同时,Subject 会将 observers 数组清空,并将自身的 closed 属性设为 true,表示它已经被销毁。

-- -------------------- ---- -------
------ ----- ---------- ------- ------------- ---------- ---------------- -
  -- ---

  -------------- ---- -
    -------------- - -----
    ----------- - -----
    -------------- - ------
  -

  --------- --------------- ---- -
    -- -- --------- ----------- ----------- --
    -------------- - ------
  -
-

需要注意的是,Subject 销毁时会同时销毁它所有的观察者。因此,如果我们在使用 Subject 时不小心传入了一些持久化的观察者,这些观察者也会被自动销毁。

总结

本文对 RxJS 中的 Subject 进行了深入分析,并给出了相应的示例代码。Subject 是 RxJS 的重要概念之一,它允许 Observable 与 Observer 之间进行交互,并可以按照特定的策略将数据传递给订阅者。Subject 的基本用法和几种常见类型都有着不同的特点和应用场景,需要根据具体业务场景进行选择和使用。在使用 Subject 时,我们需要特别注意订阅的顺序、持久化的观察者以及自动销毁等方面的问题,以免造成不必要的麻烦。

来源:JavaScript中文网 ,转载请注明来源 https://www.javascriptcn.com/post/648d2a3348841e9894b76a74

纠错
反馈