首页 文章

如何使用RxJS将一系列Promise转换为Rx.Observable?

提问于
浏览
1

我正在尝试使用RxJS从一系列Promises中创建一个 Rx.Observable . 与this question的区别在于我的Promise数量未知,而且每个Promise都取决于前一个Promise的结果 .

基本上我有一系列页面,连接“下一页”链接 .

我想要的功能是:

  • 等待承诺<>

  • 提供结果(fire observer.onNext())

  • 检查是否有下一页链接

  • 使用该链接创建下一个Promise <>

  • 重复直到有剩余页面

我尝试了以下方法:

private getPages<T>(firstPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {

    let observable = Rx.Observable.create<T>(async obs => {
        let page = await firstPromise;
        page.value.forEach(v => obs.onNext(v));

        while (page['@odata.nextLink']) {
            let nextPageUrl = <string>page['@odata.nextLink'];
            let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
            page = await nextPagePromise;
            page.value.forEach(v => obs.onNext(v));
        }

        obs.onCompleted();
    });

    return observable;
}

IODataCollectionResult 是一个OData结果,其中'@odata.nextLink'是下一页的url而.value是一个值数组)

The problem is 我无法用TypeScript编译它,它给了我一个错误:

类型'(obs:Observer)=> Promise'的参数不能赋予类型'的参数'(observer:Observer)=> void |功能| IDisposable的” .

这是有道理的,因为异步函数返回 Promise<void> ,而不是 void .

这是否意味着我不能对Rx.Observable.create()使用async / await?如何将一系列Promise链接到一个Observable?

2 回答

  • 1

    你可以将 async function 包含在使结果无效的东西中:

    function toVoid<A>(fn: A => Any): A => Void {
        return x => void fn(x)
    }
    

    (原谅我对TypeScript缺乏了解,但我希望你能猜到它应该做什么)

    有了这个,你应该可以打电话

    let observable = Rx.Observable.create<T>(toVoid(async obs => {
        …
    }));
    

    但也许你不应该这样做 . 不要丢弃promise,而是使用它来附加相应的错误处理程序:

    let observable = Rx.Observable.create<T>(obs => {
        (async () => {
            …
        }()).catch(err => {
            obs.onError(err);
        });
    });
    
  • 0

    使用.then()递归解决了这个问题,没有async / await:

    private getPages<T>(initialPromise: PromiseLike<IODataCollectionResult<T>>): Rx.Observable<T> {
        return Rx.Observable.create<T>(obs => {
            const getPage = (promise: PromiseLike<IODataCollectionResult<T>>) => {
                promise.then(page => {
                    page.value.forEach(v => obs.onNext(v));
                    if (page['@odata.nextLink']) {
                        let nextPageUrl = <string>page['@odata.nextLink'];
                        let nextPagePromise = <PromiseLike<IODataCollectionResult<T>>>this.resource(nextPageUrl).get().$promise;
                        getPage(nextPagePromise);
                    }
                    else {
                        obs.onCompleted();
                    }
                });
            }
            getPage(initialPromise);
        });
    }
    

相关问题