首页 文章

RXJS:向Observable添加一个函数,以便在订阅时执行(延迟)

提问于
浏览
3

向Observable添加一个函数,以便在订阅时执行(延迟)

我有一个由事件制成的Observable . 在这种情况下,蓝牙通知 .

我想只在有人订阅Observable时运行一个函数(startNotifictions) .

此代码在以前的版本中确实有效 . 它与Ionic3框架有关 . 它添加了一个新的运算符,在订阅时运行 . 现在,转换器有类型的问题,抱怨两次,.doOnSubscribe在typedef Observable any>和<{}>上不可用 .

任何人都知道如何正确输入该类型?可能延长?试图直接使用.defer,无济于事 .

// add operator doOnSubscribe to the event observable
        Observable.prototype.doOnSubscribe = function(onSubscribe) {
            let source = this;
            return Observable.defer(() => {
                onSubscribe();
                return source;
            });
        };

        // return the Observable for the notify char, with startNotify on first subscribe
        getUartDataNote( Observable.fromEvent( this.uartChar, 'characteristicvaluechanged' )
            .doOnSubscribe(() => {
                console.log('starting note');
                this.uartChar.startNotifications();
            })
            .map( value => String.fromCharCode.apply( null, new Uint8Array( this.uartChar.value.buffer )))
            .takeUntil( Observable.fromEvent( this.gatt.device, 'gattserverdisconnected' ))
            .finally(() => {
                console.log( 'stream disconnected ');
                // not necessary: return this.uartChar.stopNotifications()
            })
            .share()
        );

2 回答

  • 2

    以下是编写类型扩充的方法 .

    export {}
    
    declare module 'rxjs/Observable' {
      interface Observable<T> {
        doOnSubscribe(onSubscribe: () => void): this;
      }
    }
    

    这在TypeScript手册的Declaration Merging部分中有记录 .

  • 3

    如果您使用的是 rxjs 版本5,则可以使用纯函数实现doOnSubscribe,而不是修补原型 . 这样您就不需要类型扩充了 .

    import {defer} from 'rxjs/observable/defer';
    import {Observable} from 'rxjs/Observable';
    
    /** Example
    import {from} from 'rxjs/observable/from';
    
    from([1, 2, 3])
        .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
        .subscribe(x => console.log(x), null, () => console.log('completed'));
    */
    
    export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) =>  Observable<T> {
        return function inner(source: Observable<T>): Observable<T> {
            return defer(() => {
              onSubscribe();
    
              return source;
            });
        };
    }
    

    https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d

相关问题