什么是 Subject
关于 Subject 的定义,官方文档的解释算是十分清晰而明确了:
换句话说,可以将 Subject 对象看成一个多播的 Observable 对象,源代码对于 Subject 对象和 Observable 对象之间的继承关系亦十分明确:
class Subject<T> extends Observable<T> implements SubscriptionLike {}
因此,我们本期学习的重点,落在两个方面:其一,Subject 多播的能力是如何设计的;其二,Subject 相对 Observable 同名方法的差异所在。在某些情况下,上述两个问题实质上是一个问题。接下来,我们同样从一个简单的示例入手:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
const observerA = {
next: (v) => console.log(`observerA: ${v}`)
};
const observerB = {
next: (v) => console.log(`observerB: ${v}`)
};
subject.subscribe(observerA);
subject.subscribe(observerB);
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
首先,我们创建了一个数据流对象subject
,内部数据类型为number
。当对象subject
调用订阅函数,其实际执行的是 Observable.subscribe
;同时,对象 observerA 并非一个 Subscriber 对象实例,代码会创建一个 SafeSubscriber 实例,可得:
this.destination = {
next: (v) => console.log(`observerA: ${v}`),
error: (err) => throw err, // defaultErrorHandler
complete: () => {}, // noop
}
在上述例子中,我们并未赋予 subject
对象内部属性 source
和 operators
相应的值,因此,其方法 subscribe
实际上执行的是 Subject._trySubscribe
方法,本质上依旧是 Observable 对象的 _trySubscribe
方法:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/** ... */
protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
this._throwIfClosed();
return super._trySubscribe(subscriber);
}
protected _throwIfClosed() {
if (this.closed) {
throw new ObjectUnsubscribedError();
}
}
/** ... */
}
要知道,对于变量 subject
对象,其内部的 this
指向的是 Subject
对象,因此,其订阅函数最终执行的是:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/** ... */
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._checkFinalizedStatuses(subscriber);
return this._innerSubscribe(subscriber);
}
protected _innerSubscribe(subscriber: Subscriber<any>) {
const { hasError, isStopped, observers } = this;
return hasError || isStopped
? EMPTY_SUBSCRIPTION
: (observers.push(subscriber), new Subscription(() => arrRemove(observers, subscriber)));
}
protected _checkFinalizedStatuses(subscriber: Subscriber<any>) {
const { hasError, thrownError, isStopped } = this;
if (hasError) {
subscriber.error(thrownError);
} else if (isStopped) {
subscriber.complete();
}
}
/** ... */
}
在上述示例中,当 subject
执行 subscribe()
方法时,传入的 observer 会被添加到 obervers 数组末尾,同时,创建一个 initialTeardown
为如下函数的 Subscription 对象,当其退订时,会执行该函数,从 observers 中移除该 observer:
initialTeardown: () => arrRemove(observers, subscriber));
故而,Subject 相对于 Observable 多播的能力基础,便在于对其 observers
属性的管理。接下来,我们看看 Subject 数据流是如何多播的?顾名思义,多播意味着数据源向多个 Subscriber / Observer 推送数据,因此我们仅需了解一波 Subject 对象的 next 方法即可:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/** ... */
next(value: T) {
this._throwIfClosed();
if (!this.isStopped) {
const copy = this.observers.slice();
for (const observer of copy) {
observer.next(value);
}
}
}
error(err: any) {
this._throwIfClosed();
if (!this.isStopped) {
this.hasError = this.isStopped = true;
this.thrownError = err;
const { observers } = this;
while (observers.length) {
observers.shift()!.error(err);
}
}
}
complete() {
this._throwIfClosed();
if (!this.isStopped) {
this.isStopped = true;
const { observers } = this;
while (observers.length) {
observers.shift()!.complete();
}
}
}
/** ... */
}
从代码中看,Subject 对象多播能力的本质是递归调用 Observer.next,并没有什么神秘之处。最后,我们再来看看 Subject 与 Observable 不同的函数方法,其中尤为特别的是 unsubscribe()
方法:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/** ... */
unsubscribe(): void {
this.isStopped = this.closed = true;
this.observers = null!;
}
/** ... */
}
Suject.unsubscribe 方法的目的是为了清空自己的 Observers;有意思的是,之前我们学过的与之同名且易混淆的 Subscription.unsubscribe 方法,其目的是 Subscriber / Observer 自身取消对 Observable / Subject 的订阅。
最后,我们再看一下日常使用频率颇高的 Subject.asObservable
方法:
class Subject<T> extends Observable<T> implements SubscriptionLike {
/** ... */
asObservable(): Observable<T> {
const observable: any = new Observable<T>();
observable.source = this;
return observable;
}
/** ... */
}
前面学习 Observable 时,我们发现其 source
属性似乎一直是 undefined,很难领会其作用;然而其对于 Subject.asObservable 却是不可或缺的:asObservable
方法创建了一个新的 Observable 对象实例,并设置其 source
属性为 Subject 对象自身,也就是说,Subject 对象代替 Subscriber 成为了 Observable 对象的数据源,Subject 对象推送的数据能够被 Observable 对象的 Observer 订阅获得。一个普通的 Observable 对象,其调用 subscribe 函数方法,实质上是新增了一个 Subscriber;基于 Subject.asObservable 方法创建的 Observable 对象,其调用 subscribe 函数方法,本质上是为 Subject 新增了一个 Observer。接下来,我们来看看 Subject 的几种对象变体(the variants of Subjects)。
BehaviorSubject 是什么
BehaviorSubject 对象相对于 Subject 对象,其总是保存数据流推送的最近一个数据,任意新的订阅者总是收到最新值。直接看示例:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Logs
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
不难发现,当我们创建一个新的 BehaviorSubject 对象实例时,需要传入初始值参数,表示当前数据流最新值,其源代码亦十分简洁,并允许我们直接获取最新值:
class BehaviorSubject<T> extends Subject<T> {
constructor(private _value: T) {
super();
}
get value(): T {
return this.getValue();
}
getValue(): T {
const { hasError, thrownError, _value } = this;
if (hasError) {
throw thrownError;
}
this._throwIfClosed();
return _value;
}
/** ...*/
}
基于 TypeScript 语法规则,在 construtor()
中直接传入 private 参数,相当于自动声明,对象存在一个同名属性。要知道,对于 BehaviorSubject 对象,每次订阅均能获取最新值,那么第一次订阅就需要拿到最新值,这就要求 BehaviorSubject 对象需要一个初始值:
class BehaviorSubject<T> extends Subject<T> {
/** ...*/
protected _subscribe(subscriber: Subscriber<T>): Subscription {
const subscription = super._subscribe(subscriber);
!subscription.closed && subscriber.next(this._value);
return subscription;
}
next(value: T): void {
super.next((this._value = value));
}
}
BehaviorSubject 对象的订阅基本与 Subject 订阅方法一致,不同的是,其需要直接给出最新值,因而设若数据流尚未结束,需要执行subscriber.next(``this``._value)
;同时,当推送新的数据时,需要更新最新值。
AsyncSubject 是什么
对 Observers 来说,AsyncSubject 与 BahaviorSubject 相似,同样能从中获取最新值;不同的是,订阅AsyncSubject 对象,只有当数据流结束之后,才能拿到结束前的最新值。比如:
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Logs:
// observerA: 5
// observerB: 5
可以想象,AsyncSubject 对象执行 next
方法时,并不会马上推送新值,而是更新当前最新值;当执行 complete
方法时,更新最新值的同时,推送该值。RxJS 相关逻辑的源代码亦十分容易理解:
class AsyncSubject<T> extends Subject<T> {
private _value: T | null = null;
private _hasValue = false;
private _isComplete = false;
/** @internal */
protected _checkFinalizedStatuses(subscriber: Subscriber<T>) {
const { hasError, _hasValue, _value, thrownError, isStopped } = this;
if (hasError) {
subscriber.error(thrownError);
} else if (isStopped) {
_hasValue && subscriber.next(_value!);
subscriber.complete();
}
}
next(value: T): void {
if (!this.isStopped) {
this._value = value;
this._hasValue = true;
}
}
complete(): void {
const { _hasValue, _value, _isComplete } = this;
if (!_isComplete) {
this._isComplete = true;
_hasValue && super.next(_value!);
super.complete();
}
}
}
ReplaySubject
ReplaySubject 对象的关键即在于「REPLAY」,按照官方文档的描述:
也就是说,当 ReplaySubejct 有新的订阅者时,会将存储的多个值重新推送给新的订阅者,值的数量取决于数量窗口大小或时间窗口大小。比如:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
上述例子中,ReplaySubject 对象实例的数量窗口大小为 3,故而当新的订阅者 Observer B 甫一订阅,便收到了三个旧值。除此之外,ReplaySubject 还允许在数量窗口的基础上设置时间窗口,比如:
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(100, 500 /* windowTime */);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
// Logs
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
如上所示,我们设置 ReplaySubject 对象实例的数量窗口为100,时间窗口为500ms。当新的订阅者订阅该对象时,时间已经过去了 1000ms,因此需要将500ms内,数量上限为100的值推送给新的订阅者,因此获得了3个值。接下来,我们看一下 ReplaySubject 的源代码实现。
class ReplaySubject<T> extends Subject<T> {
private _buffer: (T | number)[] = [];
private _infiniteTimeWindow = true;
constructor(
private _bufferSize = Infinity,
private _windowTime = Infinity,
private _timestampProvider: TimestampProvider = dateTimestampProvider
) {
super();
this._infiniteTimeWindow = _windowTime === Infinity;
this._bufferSize = Math.max(1, _bufferSize);
this._windowTime = Math.max(1, _windowTime);
}
/** ... */
}
显然,ReplaySubject 对象允许传入三个参数,前二者分别表示数量窗口和时间窗口,其默认值为无限值。
class ReplaySubject<T> extends Subject<T> {
/** ... */
next(value: T): void {
const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
if (!isStopped) {
_buffer.push(value);
!_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
}
this._trimBuffer();
super.next(value);
}
/** ... */
}
ReplaySubject 对象执行 next 方法,除了要执行 super.next(value)
,还需要将新值添到 _buffer
属性末尾,并记录时间,在此之前,需要对 _buffer 进行剪枝:
class ReplaySubject<T> extends Subject<T> {
/** ... */
private _trimBuffer() {
const { _bufferSize, _timestampProvider, _buffer, _infiniteTimeWindow } = this;
// If we don't have an infinite buffer size, and we're over the length,
// use splice to truncate the old buffer values off. Note that we have to
// double the size for instances where we're not using an infinite time window
// because we're storing the values and the timestamps in the same array.
const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
_bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
// Now, if we're not in an infinite time window, remove all values where the time is
// older than what is allowed.
if (!_infiniteTimeWindow) {
const now = _timestampProvider.now();
let last = 0;
// Search the array for the first timestamp that isn't expired and
// truncate the buffer up to that point.
for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
last = i;
}
last && _buffer.splice(0, last + 1);
}
}
/** ... */
}
剪枝的策略十分简单,首先计算得到当前 BufferSize,并从原 _buffer
中从头开始删除,直到其大小等于 BufferSize 为止;接下来,若是存在时间窗口,则需要依据当前时间和时间窗口,清除时间窗口之外的值,需要注意的是,对于 _buffer 中的每一个奇数位元素,均为数据流中的值;每一个偶数位的值,均为加上了时间窗口的时间戳。
protected _subscribe(subscriber: Subscriber<T>): Subscription {
this._throwIfClosed();
this._trimBuffer();
const subscription = this._innerSubscribe(subscriber);
const { _infiniteTimeWindow, _buffer } = this;
// We use a copy here, so reentrant code does not mutate our array while we're
// emitting it to a new subscriber.
const copy = _buffer.slice();
for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
subscriber.next(copy[i] as T);
}
this._checkFinalizedStatuses(subscriber);
return subscription;
}
当 ReplaySubject 新增订阅者时,继承 Subject 的 _innerSubscribe
方法同时,会复制一份保存的旧数据,推送给新的订阅者。
下一步
学习完 RxJS 核心三巨头 Observable、Subject、Subscription,终于到了最眼花缭乱的 Operators 模块。有了 pipe() 的存在,Operators 有了极大的用武之地,也使得 RxJS 能够处理各种复杂的场景。接下来,我们会花一段时间学习 Operators,尽情期待!
常见问题FAQ
- 免费下载或者VIP会员专享资源能否直接商用?
- 本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
- 提示下载完但解压或打开不了?
- 找不到素材资源介绍文章里的示例图片?
- 模板不会安装或需要功能定制以及二次开发?
发表评论
还没有评论,快来抢沙发吧!