最新公告
  • 欢迎您光临起源地模板网,本站秉承服务宗旨 履行“站长”责任,销售只是起点 服务永无止境!立即加入钻石VIP
  • RxJS 源代码学习(四)—— Subjects

    正文概述 掘金(HyBin)   2021-08-01   763

    什么是 Subject

    关于 Subject 的定义,官方文档的解释算是十分清晰而明确了:

    换句话说,可以将 Subject 对象看成一个多播的 Observable 对象,源代码对于 Subject 对象和 Observable 对象之间的继承关系亦十分明确:

    class Subject<T> extends Observable<T> implements SubscriptionLike {}
    

    因此,我们本期学习的重点,落在两个方面:其一,Subject 多播的能力是如何设计的;其二,Subject 相对 Observable 同名方法的差异所在。在某些情况下,上述两个问题实质上是一个问题。接下来,我们同样从一个简单的示例入手:

    import { Subject } from 'rxjs';
    
    const subject = new Subject<number>();
    
    const observerA = {
      next: (v) => console.log(`observerA: ${v}`)
    };
    const observerB = {
      next: (v) => console.log(`observerB: ${v}`)
    };
    
    subject.subscribe(observerA);
    subject.subscribe(observerB);
     
    subject.next(1);
    subject.next(2);
    
    // Logs:
    // observerA: 1
    // observerB: 1
    // observerA: 2
    // observerB: 2
    

    首先,我们创建了一个数据流对象subject,内部数据类型为number。当对象subject调用订阅函数,其实际执行的是 Observable.subscribe;同时,对象 observerA 并非一个 Subscriber 对象实例,代码会创建一个 SafeSubscriber 实例,可得:

    this.destination = {
        next: (v) => console.log(`observerA: ${v}`),
        error: (err) => throw err, // defaultErrorHandler
        complete: () => {}, // noop
    }
    

    在上述例子中,我们并未赋予 subject 对象内部属性 sourceoperators 相应的值,因此,其方法 subscribe 实际上执行的是 Subject._trySubscribe 方法,本质上依旧是 Observable 对象的 _trySubscribe 方法:

    class Subject<T> extends Observable<T> implements SubscriptionLike {
        /** ... */
        protected _trySubscribe(subscriber: Subscriber<T>): TeardownLogic {
            this._throwIfClosed();
            return super._trySubscribe(subscriber);
        }
        
        protected _throwIfClosed() {
            if (this.closed) {
                throw new ObjectUnsubscribedError();
            }
        }
        /** ... */
    }
    

    要知道,对于变量 subject 对象,其内部的 this 指向的是 Subject 对象,因此,其订阅函数最终执行的是:

    class Subject<T> extends Observable<T> implements SubscriptionLike {
        /** ... */
        protected _subscribe(subscriber: Subscriber<T>): Subscription {
            this._throwIfClosed();
            this._checkFinalizedStatuses(subscriber);
            return this._innerSubscribe(subscriber);
        }
        
        protected _innerSubscribe(subscriber: Subscriber<any>) {
            const { hasError, isStopped, observers } = this;
            return hasError || isStopped
                ? EMPTY_SUBSCRIPTION
                : (observers.push(subscriber), new Subscription(() => arrRemove(observers, subscriber)));
        }
        
        protected _checkFinalizedStatuses(subscriber: Subscriber<any>) {
            const { hasError, thrownError, isStopped } = this;
            if (hasError) {
                subscriber.error(thrownError);
            } else if (isStopped) {
                subscriber.complete();
            }
        }
        /** ... */
    }
    

    在上述示例中,当 subject 执行 subscribe() 方法时,传入的 observer 会被添加到 obervers 数组末尾,同时,创建一个 initialTeardown 为如下函数的 Subscription 对象,当其退订时,会执行该函数,从 observers 中移除该 observer:

    initialTeardown: () => arrRemove(observers, subscriber));
    

    故而,Subject 相对于 Observable 多播的能力基础,便在于对其 observers 属性的管理。接下来,我们看看 Subject 数据流是如何多播的?顾名思义,多播意味着数据源向多个 Subscriber / Observer 推送数据,因此我们仅需了解一波 Subject 对象的 next 方法即可:

    class Subject<T> extends Observable<T> implements SubscriptionLike {
        /** ... */
        next(value: T) {
            this._throwIfClosed();
            if (!this.isStopped) {
                const copy = this.observers.slice();
                for (const observer of copy) {
                    observer.next(value);
                }
            }
        }
    
        error(err: any) {
            this._throwIfClosed();
            if (!this.isStopped) {
                this.hasError = this.isStopped = true;
                this.thrownError = err;
                const { observers } = this;
                while (observers.length) {
                    observers.shift()!.error(err);
                }
            }
        }
    
        complete() {
            this._throwIfClosed();
            if (!this.isStopped) {
                this.isStopped = true;
                const { observers } = this;
                while (observers.length) {
                    observers.shift()!.complete();
                }
            }
        }
        /** ... */
    }
    

    从代码中看,Subject 对象多播能力的本质是递归调用 Observer.next,并没有什么神秘之处。最后,我们再来看看 Subject 与 Observable 不同的函数方法,其中尤为特别的是 unsubscribe() 方法:

    class Subject<T> extends Observable<T> implements SubscriptionLike {
        /** ... */
        unsubscribe(): void {
            this.isStopped = this.closed = true;
            this.observers = null!;
        }
        /** ... */
    }
    

    Suject.unsubscribe 方法的目的是为了清空自己的 Observers;有意思的是,之前我们学过的与之同名且易混淆的 Subscription.unsubscribe 方法,其目的是 Subscriber / Observer 自身取消对 Observable / Subject 的订阅。

    最后,我们再看一下日常使用频率颇高的 Subject.asObservable 方法:

    class Subject<T> extends Observable<T> implements SubscriptionLike {
        /** ... */
        asObservable(): Observable<T> {
            const observable: any = new Observable<T>();
            observable.source = this;
            return observable;
        }
        /** ... */
    }
    

    前面学习 Observable 时,我们发现其 source 属性似乎一直是 undefined,很难领会其作用;然而其对于 Subject.asObservable 却是不可或缺的:asObservable 方法创建了一个新的 Observable 对象实例,并设置其 source 属性为 Subject 对象自身,也就是说,Subject 对象代替 Subscriber 成为了 Observable 对象的数据源,Subject 对象推送的数据能够被 Observable 对象的 Observer 订阅获得。一个普通的 Observable 对象,其调用 subscribe 函数方法,实质上是新增了一个 Subscriber;基于 Subject.asObservable 方法创建的 Observable 对象,其调用 subscribe 函数方法,本质上是为 Subject 新增了一个 Observer。接下来,我们来看看 Subject 的几种对象变体(the variants of Subjects)。

    BehaviorSubject 是什么

    BehaviorSubject 对象相对于 Subject 对象,其总是保存数据流推送的最近一个数据,任意新的订阅者总是收到最新值。直接看示例:

    import { BehaviorSubject } from 'rxjs';
    const subject = new BehaviorSubject(0); // 0 is the initial value
    
    subject.subscribe({
        next: (v) => console.log(`observerA: ${v}`)
    });
     
    subject.next(1);
    subject.next(2);
     
    subject.subscribe({
        next: (v) => console.log(`observerB: ${v}`)
    });
     
    subject.next(3);
    
    // Logs
    // observerA: 0
    // observerA: 1
    // observerA: 2
    // observerB: 2
    // observerA: 3
    // observerB: 3
    

    不难发现,当我们创建一个新的 BehaviorSubject 对象实例时,需要传入初始值参数,表示当前数据流最新值,其源代码亦十分简洁,并允许我们直接获取最新值:

    class BehaviorSubject<T> extends Subject<T> {
        constructor(private _value: T) {
            super();
        }
    
        get value(): T {
            return this.getValue();
        }
       
        getValue(): T {
            const { hasError, thrownError, _value } = this;
            if (hasError) {
                throw thrownError;
            }
            this._throwIfClosed();
            return _value;
        }
        
        /** ...*/
    }
    

    基于 TypeScript 语法规则,在 construtor() 中直接传入 private 参数,相当于自动声明,对象存在一个同名属性。要知道,对于 BehaviorSubject 对象,每次订阅均能获取最新值,那么第一次订阅就需要拿到最新值,这就要求 BehaviorSubject 对象需要一个初始值:

    class BehaviorSubject<T> extends Subject<T> {
        /** ...*/
        protected _subscribe(subscriber: Subscriber<T>): Subscription {
            const subscription = super._subscribe(subscriber);
            !subscription.closed && subscriber.next(this._value);
            return subscription;
        }
        
        next(value: T): void {
            super.next((this._value = value));
        }
    }
    

    BehaviorSubject 对象的订阅基本与 Subject 订阅方法一致,不同的是,其需要直接给出最新值,因而设若数据流尚未结束,需要执行subscriber.next(``this``._value);同时,当推送新的数据时,需要更新最新值。

    AsyncSubject 是什么

    对 Observers 来说,AsyncSubject 与 BahaviorSubject 相似,同样能从中获取最新值;不同的是,订阅AsyncSubject 对象,只有当数据流结束之后,才能拿到结束前的最新值。比如:

    import { AsyncSubject } from 'rxjs';
    
    const subject = new AsyncSubject();
     
    subject.subscribe({
      next: (v) => console.log(`observerA: ${v}`)
    });
     
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
     
    subject.subscribe({
      next: (v) => console.log(`observerB: ${v}`)
    });
     
    subject.next(5);
    subject.complete();
     
    // Logs:
    // observerA: 5
    // observerB: 5
    

    可以想象,AsyncSubject 对象执行 next 方法时,并不会马上推送新值,而是更新当前最新值;当执行 complete 方法时,更新最新值的同时,推送该值。RxJS 相关逻辑的源代码亦十分容易理解:

    class AsyncSubject<T> extends Subject<T> {
        private _value: T | null = null;
    
        private _hasValue = false;
    
        private _isComplete = false;
    
        /** @internal */
        protected _checkFinalizedStatuses(subscriber: Subscriber<T>) {
            const { hasError, _hasValue, _value, thrownError, isStopped } = this;
            if (hasError) {
                subscriber.error(thrownError);
            } else if (isStopped) {
                _hasValue && subscriber.next(_value!);
                subscriber.complete();
            }
        }
        
        next(value: T): void {
            if (!this.isStopped) {
                this._value = value;
                this._hasValue = true;
            }
        }
    
        complete(): void {
            const { _hasValue, _value, _isComplete } = this;
            if (!_isComplete) {
                this._isComplete = true;
                _hasValue && super.next(_value!);
                super.complete();
            }
        }
    }
    

    ReplaySubject

    ReplaySubject 对象的关键即在于「REPLAY」,按照官方文档的描述:

    也就是说,当 ReplaySubejct 有新的订阅者时,会将存储的多个值重新推送给新的订阅者,值的数量取决于数量窗口大小或时间窗口大小。比如:

    import { ReplaySubject } from 'rxjs';
    
    const subject = new ReplaySubject(3); // buffer 3 values for new subscribers
     
    subject.subscribe({
      next: (v) => console.log(`observerA: ${v}`)
    });
     
    subject.next(1);
    subject.next(2);
    subject.next(3);
    subject.next(4);
     
    subject.subscribe({
      next: (v) => console.log(`observerB: ${v}`)
    });
     
    subject.next(5);
     
    // Logs:
    // observerA: 1
    // observerA: 2
    // observerA: 3
    // observerA: 4
    // observerB: 2
    // observerB: 3
    // observerB: 4
    // observerA: 5
    // observerB: 5
    

    上述例子中,ReplaySubject 对象实例的数量窗口大小为 3,故而当新的订阅者 Observer B 甫一订阅,便收到了三个旧值。除此之外,ReplaySubject 还允许在数量窗口的基础上设置时间窗口,比如:

    import { ReplaySubject } from 'rxjs';
    const subject = new ReplaySubject(100, 500 /* windowTime */);
     
    subject.subscribe({
      next: (v) => console.log(`observerA: ${v}`)
    });
     
    let i = 1;
    setInterval(() => subject.next(i++), 200);
     
    setTimeout(() => {
      subject.subscribe({
        next: (v) => console.log(`observerB: ${v}`)
      });
    }, 1000);
    
    // Logs
    // observerA: 1
    // observerA: 2
    // observerA: 3
    // observerA: 4
    // observerA: 5
    // observerB: 3
    // observerB: 4
    // observerB: 5
    // observerA: 6
    // observerB: 6
    

    如上所示,我们设置 ReplaySubject 对象实例的数量窗口为100,时间窗口为500ms。当新的订阅者订阅该对象时,时间已经过去了 1000ms,因此需要将500ms内,数量上限为100的值推送给新的订阅者,因此获得了3个值。接下来,我们看一下 ReplaySubject 的源代码实现。

    class ReplaySubject<T> extends Subject<T> {
        private _buffer: (T | number)[] = [];
        private _infiniteTimeWindow = true;
        
        constructor(
            private _bufferSize = Infinity,
            private _windowTime = Infinity,
            private _timestampProvider: TimestampProvider = dateTimestampProvider
        ) {
            super();
            this._infiniteTimeWindow = _windowTime === Infinity;
            this._bufferSize = Math.max(1, _bufferSize);
            this._windowTime = Math.max(1, _windowTime);
      }
      /** ... */
    }
    

    显然,ReplaySubject 对象允许传入三个参数,前二者分别表示数量窗口和时间窗口,其默认值为无限值。

    class ReplaySubject<T> extends Subject<T> {
        /** ... */
        next(value: T): void {
            const { isStopped, _buffer, _infiniteTimeWindow, _timestampProvider, _windowTime } = this;
            if (!isStopped) {
                _buffer.push(value);
                !_infiniteTimeWindow && _buffer.push(_timestampProvider.now() + _windowTime);
            }
            this._trimBuffer();
            super.next(value);
        }
        /** ... */
    }
    

    ReplaySubject 对象执行 next 方法,除了要执行 super.next(value),还需要将新值添到 _buffer 属性末尾,并记录时间,在此之前,需要对 _buffer 进行剪枝:

    class ReplaySubject<T> extends Subject<T> {
        /** ... */
        private _trimBuffer() {
            const { _bufferSize, _timestampProvider, _buffer, _infiniteTimeWindow } = this;
            // If we don't have an infinite buffer size, and we're over the length,
            // use splice to truncate the old buffer values off. Note that we have to
            // double the size for instances where we're not using an infinite time window
            // because we're storing the values and the timestamps in the same array.
            const adjustedBufferSize = (_infiniteTimeWindow ? 1 : 2) * _bufferSize;
            _bufferSize < Infinity && adjustedBufferSize < _buffer.length && _buffer.splice(0, _buffer.length - adjustedBufferSize);
    
            // Now, if we're not in an infinite time window, remove all values where the time is
    
            // older than what is allowed.
            if (!_infiniteTimeWindow) {
                const now = _timestampProvider.now();
                let last = 0;
                // Search the array for the first timestamp that isn't expired and
                // truncate the buffer up to that point.
                for (let i = 1; i < _buffer.length && (_buffer[i] as number) <= now; i += 2) {
                    last = i;
                }
                last && _buffer.splice(0, last + 1);
            }
        }
    
        /** ... */
    
    }
    

    剪枝的策略十分简单,首先计算得到当前 BufferSize,并从原 _buffer 中从头开始删除,直到其大小等于 BufferSize 为止;接下来,若是存在时间窗口,则需要依据当前时间和时间窗口,清除时间窗口之外的值,需要注意的是,对于 _buffer 中的每一个奇数位元素,均为数据流中的值;每一个偶数位的值,均为加上了时间窗口的时间戳。

    protected _subscribe(subscriber: Subscriber<T>): Subscription {
        this._throwIfClosed();
        this._trimBuffer();
    
        const subscription = this._innerSubscribe(subscriber);
    
        const { _infiniteTimeWindow, _buffer } = this;
        // We use a copy here, so reentrant code does not mutate our array while we're
        // emitting it to a new subscriber.
        const copy = _buffer.slice();
        for (let i = 0; i < copy.length && !subscriber.closed; i += _infiniteTimeWindow ? 1 : 2) {
          subscriber.next(copy[i] as T);
        }
    
        this._checkFinalizedStatuses(subscriber);
        return subscription;
    }
    

    当 ReplaySubject 新增订阅者时,继承 Subject 的 _innerSubscribe 方法同时,会复制一份保存的旧数据,推送给新的订阅者。

    下一步

    学习完 RxJS 核心三巨头 Observable、Subject、Subscription,终于到了最眼花缭乱的 Operators 模块。有了 pipe() 的存在,Operators 有了极大的用武之地,也使得 RxJS 能够处理各种复杂的场景。接下来,我们会花一段时间学习 Operators,尽情期待!


    起源地下载网 » RxJS 源代码学习(四)—— Subjects

    常见问题FAQ

    免费下载或者VIP会员专享资源能否直接商用?
    本站所有资源版权均属于原作者所有,这里所提供资源均只能用于参考学习用,请勿直接商用。若由于商用引起版权纠纷,一切责任均由使用者承担。更多说明请参考 VIP介绍。
    提示下载完但解压或打开不了?
    最常见的情况是下载不完整: 可对比下载完压缩包的与网盘上的容量,若小于网盘提示的容量则是这个原因。这是浏览器下载的bug,建议用百度网盘软件或迅雷下载。若排除这种情况,可在对应资源底部留言,或 联络我们.。
    找不到素材资源介绍文章里的示例图片?
    对于PPT,KEY,Mockups,APP,网页模版等类型的素材,文章内用于介绍的图片通常并不包含在对应可供下载素材包内。这些相关商业图片需另外购买,且本站不负责(也没有办法)找到出处。 同样地一些字体文件也是这种情况,但部分素材会在素材包内有一份字体下载链接清单。
    模板不会安装或需要功能定制以及二次开发?
    请QQ联系我们

    发表评论

    还没有评论,快来抢沙发吧!

    如需帝国cms功能定制以及二次开发请联系我们

    联系作者

    请选择支付方式

    ×
    迅虎支付宝
    迅虎微信
    支付宝当面付
    余额支付
    ×
    微信扫码支付 0 元