首页 文章

SwitchMap和PublishReplay组合使用

提问于
浏览
2

我正在开发一个Angular项目,我不完全理解为什么这段代码片段没有按预期工作:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call")) //only triggered once
  .publishReplay(1).refCount();

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs);

const c1 = observable.subscribe(() => console.log("subscriber 1"));
const c2 = observable.subscribe(() => console.log("subscriber 2"));

JS Bin

我不明白为什么“触发http调用”不会被Rx.Observable.timer发出的每个事件触发 . 我的期望是每个发出的事件都会尽快触发http调用,只要一个订阅者订阅了observable . 但是,每个用户接收相同的发射,这意味着每个事件只有一个http呼叫 .

我知道我可以通过在observable上使用publishReplay引用来修复它(见下文),但这只适用于这个简单的例子 .

我真正想要实现的是两个组件使用相同的http调用响应,每个组件发出一个onInit事件,然后switchMapped到http响应 . 我不希望每个新订阅者都触发该呼叫 . 但是,我希望在发出另一个事件时触发调用 . onInit observable与click-observable合并,只要用户点击“update”按钮,就会在使用switchMap运算符转换之前发出新事件 . 问题是,更新按钮不再触发http调用 .

此示例显示了它应该如何表现:

const httpObs = Rx.Observable.of("")
  .do(() => console.log("trigger http call"));

let observable = Rx.Observable.timer(0,1000)
  .do((e) => console.log("new event: " + e))
  .switchMap(() => this.httpObs)
  .publishReplay(1);

observable.subscribe(() => console.log("subscriber 1"));
observable.subscribe(() => console.log("subscriber 2"))

observable.connect();

JS Bin

1 回答

  • 2

    您遇到的问题是所有Subject实例都有其内部状态,当它们收到 completeerror 通知时,它们将自己标记为"stopped"并且永远不会发出任何内容(这是所谓的Observable Contract 的要求) . 你可以看看我前段时间写的这篇文章https://medium.com/@martin.sikora/rxjs-subjects-and-their-internal-state-7cfdee905156 . 它涉及非常相似的话题 .

    在您的代码中,您多次使用相同的 httpObs ,但源Observable是 of() ,它发出单个值,然后发送 complete 通知 .

    看到它真的被称为:http://jsbin.com/fuvuxax/2/edit?js,console

    const httpObs = Rx.Observable.of("")
      .do(() => console.log("trigger call"), undefined, () => console.log('complete'))
      .publishReplay(1).refCount();
    

    这意味着 the Subject inside publishReplay received the complete notification 并且不会再次订阅其源Observable . 但是,由于 publishReplay 在内部使用 ReplaySubject ,它仍会刷新其缓冲区然后发送 complete 通知 . 但这对你来说并不是很有用,因为我认为你想再次执行HTTP调用而不只是重放过时的HTTP调用 .

    您可以做的是将 httpObs 转换为每次 switchMap 收到值时创建链的方法:

    const httpObs = () => Rx.Observable.of("")
      .do(() => console.log("trigger call"), undefined, () => console.log('complete'));
    
    let observable = Rx.Observable.timer(0,1000)
      .do((e) => console.log("new event: " + e))
      .take(3)
      .switchMap(() => this.httpObs())
      .publishReplay(1).refCount();
    
    observable.subscribe(() => console.log("subscriber 1"));
    observable.subscribe(() => console.log("subscriber 2"));
    

    你可以看到这只打印 "trigger call" 只有三次,如果我理解你的描述,我认为你想要的是:

    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    "trigger call"
    "subscriber 1"
    "subscriber 2"
    "complete"
    

相关问题