首页 文章

来自EventEmitter的Hot和shared Observable

提问于
浏览
4

有没有办法从 EventEmitter (或Angular 2 alpha 46 / RxJS 5 alpha中可用的等效物)获得热观察?即 if we subscribe after the value is resolved, it triggers with the previously resolved value . 与我们总是返回相同承诺时的情况类似 .

理想情况下,只使用Angular 2对象(我稍后会在某处读取轻量级RxJS以删除依赖项),否则导入RxJS就可以了 . AsyncSubject似乎符合我的需要,但在RxJS 5 alpha中不可用 .

我尝试了以下,没有成功(从不触发) . 有关如何使用它的任何想法?

let emitter = new EventEmitter<MyObj>();
setTimeout(() => {emitter.next(new MyObj());});
this.observable = emitter;
return this.observable.share();

Full plunker here comparing hot and cold

Usecase :仅一次访问一些异步对象(例如,在 new EventEmitter 中合并/包装一系列HTTP调用),但将已解析的异步对象提供给订阅它的任何服务/组件,即使它们在解析后订阅(HTTP)收到回复) .

编辑:问题不是关于如何合并HTTP响应,而是如何从EventEmitter或任何等效的Angular 2 alpha 46 / RxJS 5 alpha获得(hot?)observable,允许在检索/解析异步结果后进行订阅(HTTP只是异步原点的一个例子) . myEventEmitter.share() does not work (参见上面的plunker),虽然它适用于HTTP返回的Observable(参见plunker from @Eric Martinez) . 从Angular 2 alpha 46开始,.toRx()方法不再存在,EventEmitter是observable和subject本身 .

只要我们总是返回相同的promise对象,这对promises来说效果很好 . 由于我们有观察者介绍了HTTP Angular 2服务,我想避免混合使用promises和观察者(据说观察者比promises更强大,因此它应该允许使用promise进行简单的操作) .

Specs about share()(我没有找到版本5 alpha的文档 - Angular 2使用的版本) - 处理由Angular 2 HTTP服务返回的 Observable ,而不是在EventEmitter上工作 .

编辑:澄清为什么不使用HTTP返回的Observable,并补充说不直接使用RxJS会更好 .

编辑:更改说明:关注的是多个订阅,而不是合并HTTP结果 .

谢谢!

2 回答

  • 3

    您似乎描述的功能不是冷可观察的功能,而是超过 Rx.BehaviourSubject . 看看这里有关Rxjs主题的解释:https://github.com/Reactive-Extensions/RxJS/blob/master/doc/gettingstarted/subjects.md .

    我引用那里:

    BehaviourSubject类似于ReplaySubject,只是它只存储了它发布的最后一个值 . BehaviourSubject在初始化时也需要默认值 . 当主体尚未接收到其他值时,将该值发送给观察者 . 这意味着除非主题已经完成,否则所有订阅者将在订阅时立即收到值 .

    Rx.AsyncSubject 将是最接近承诺的行为:

    AsyncSubject类似于Replay和Behavior主题,但它只存储最后一个值,并且只在序列完成时才发布它 . 您可以在源可观察源很热的情况下使用AsyncSubject类型,并且可以在任何观察者可以订阅它之前完成 . 在这种情况下,AsyncSubject仍然可以提供最后一个值并将其发布给任何未来的订阅者 .

    还有两条评论:

    你的掠夺者

    • this._coldObservable = emitter.share(); . 使用 share 返回一个热的可观察量!

    • EventEmitter 实际上首先扩展了主题

    UPDATE : 围绕 Rx.Observable 包裹一个EventEmitter:

    function toRx ( eventEmitter ) {
      return Rx.Observable.create(function ( observer ) {
        eventEmitter.subscribe(function listener ( value ) {observer.onNext(value)});
        // Ideally you also manage error and completion, if that makes sense with Angular2
        return function () {
          /* manage end of subscription here */
        };
      };
    )
    }
    

    一旦你有 Rx.Observable ,你可以申请 share()shareReplay(1) ,你想要的任何东西 .

    我敢打赌,Angular团队迟早会提出一个brigding功能,但如果你不想等,你可以自己做 .

  • 1

    ReplaySubject 正在做我想要的 . @robwormald提供了一个关于gitter的工作示例,我稍加修改以更好地演示 .

    公开HTTP响应:

    import {Injectable} from 'angular2/angular2';
    import {Http} from 'angular2/http';
    import {ReplaySubject} from '@reactivex/rxjs/dist/cjs/Rx'
    
    @Injectable()
    export class PeopleService {
      constructor(http:Http) {
        this.people = new ReplaySubject(1);
    
        http.get('api/people.json')
          .map(res => res.json())
          .subscribe(this.people);
      }
    }
    

    订阅多次:

    // ... annotations
    export class App {
      constructor(peopleService:PeopleService) {
    
        people.subscribe(v => {
          console.log(v)
        });
    
        //some time later
    
        setTimeout(() => {
          people.subscribe(v => {
            console.log(v)
          });
          people.subscribe(v => {
            console.log(v)
          });
        },2000)
      }
    }
    

    Full plunker

    编辑: BehaviorSubject 是另一种选择 . 在此用例中,差异是初始值,例如,如果我们要在使用HTTP响应更新之前显示缓存中的内容 .

相关问题