首页 文章

RXJS控制可观察的调用

提问于
浏览
14

我在Angular 2项目中使用RxJs version 5 . 我想创建一些observable,但我不希望立即调用observable .

version 4中,您可以使用(例如)Controlled命令或Pausable Buffers来控制调用 . 但是该功能在版本5中不是(yet) .

如何在RxJs 5中获得这种功能?

我的最终目标是对创建的observable进行排队并逐个调用它们 . 只有在成功处理上一个时才会调用下一个 . 当一个失败时,队列被清空 .

EDIT

随着@Niklas Fasching的评论,我可以使用Publish操作创建一个有效的解决方案 .

JS Bin

// Queue to queue operations
const queue = [];

// Just a function to create Observers
function createObserver(id): Observer {
    return {
        next: function (x) {
            console.log('Next: ' + id + x);
        },
        error: function (err) {
            console.log('Error: ' + err);
        },
        complete: function () {
            console.log('Completed');
        }
    };
};

// Creates an async operation and add it to the queue
function createOperation(name: string): Observable {

  console.log('add ' + name);
  // Create an async operation
  var observable = Rx.Observable.create(observer => {
    // Some async operation
    setTimeout(() => 
               observer.next(' Done'), 
               500);
  });
  // Hold the operation
  var published = observable.publish();
  // Add Global subscribe
  published.subscribe(createObserver('Global'));
  // Add it to the queue
  queue.push(published);
  // Return the published so the caller could add a subscribe
  return published;
};

// Create 4 operations on hold
createOperation('SourceA').subscribe(createObserver('SourceA'));
createOperation('SourceB').subscribe(createObserver('SourceB'));
createOperation('SourceC').subscribe(createObserver('SourceC'));
createOperation('SourceD').subscribe(createObserver('SourceD'));

// Dequeue and run the first
queue.shift().connect();

2 回答

  • 19

    您可以通过publishing observable将observable的开头从订阅分开 . 只有在调用connect之后才会启动已发布的observable .

    请注意,所有订阅者将共享对可观察序列的单个订阅 .

    var published = Observable.of(42).publish();
    // subscription does not start the observable sequence
    published.subscribe(value => console.log('received: ', value));
    // connect starts the sequence; subscribers will now receive values
    published.connect();
    
  • 6

    在订阅时仍会调用Rx4的受控Observable

    RxJS 4中的 controlled 运算符实际上只是在运算符之后控制Observable的流量 . 到目前为止,它都通过并缓冲该运营商 . 考虑一下:

    (RxJS 4)http://jsbin.com/yaqabe/1/edit?html,js,console

    const source = Rx.Observable.range(0, 5).do(x => console.log('do' + x)).controlled();
    
    source.subscribe(x => console.log(x));
    
    setTimeout(() => {
      console.log('requesting');
      source.request(2);
    }, 1000);
    

    您会注意到 Observable.range(0, 5) 中的所有五个值都会立即由 do 发出...然后在获得两个值之前暂停一秒(1000毫秒) .

    所以,'s really the illusion of backpressure control. In the end, there'是该运算符中的无界缓冲区 . 一个数组正在收集Observable "above"发送的所有内容并等待您通过调用 request(n) 将其出列 .


    RxJS 5.0.0-beta.2复制受控制

    在这个回答的时候,RxJS 5中不存在 controlled 运算符 . 这有几个原因:1 . 没有请求它,2 . 它的名字显然令人困惑(因此StackOverflow上的这个问题)

    如何在RxJS 5中复制行为(暂时):http://jsbin.com/metuyab/1/edit?html,js,console

    // A subject we'll use to zip with the source
    const controller = new Rx.Subject();
    
    // A request function to next values into the subject
    function request(count) {
      for (let i = 0; i < count; i++) {
        controller.next(count);
      }
    }
    
    // We'll zip our source with the subject, we don't care about what
    // comes out of the Subject, so we'll drop that.
    const source = Rx.Observable.range(0, 5).zip(controller, (x, _) => x);
    
    // Same effect as above Rx 4 example
    source.subscribe(x => console.log(x));
    
    // Same effect as above Rx 4 example
    request(3);
    

    背压控制

    对于“真正的背压控制”,一个解决方案是承诺的迭代器 . IoP并非没有问题,但有一点,每回合都有一个对象分配 . 每个值都有一个与之关联的Promise . 另一方面,取消不存在,因为它是承诺 .

    一个更好的,基于Rx的方法是让一个主题“提供”你的可观察链的顶部,然后你在其余的组成 .

    像这样:http://jsbin.com/qeqaxo/2/edit?js,console

    // start with 5 values
    const controller = new Rx.BehaviorSubject(5);
    
    // some observable source, in this case, an interval.
    const source = Rx.Observable.interval(100)
    
    const controlled = controller.flatMap(
          // map your count into a set of values
          (count) => source.take(count), 
          // additional mapping for metadata about when the block is done
          (count, value, _, index) => {
            return { value: value, done: count - index === 1 }; 
          })
          // when the block is done, request 5 more.
          .do(({done}) => done && controller.next(5))
          // we only care about the value for output
          .map(({value}) => value);
    
    
    // start our subscription
    controlled.subscribe(x => {
      console.log(x)
    });
    

    ...我们有一些可流动的可观察类型的计划,在不久的将来也会有真正的背压控制 . 对于这种情况,这将更令人兴奋和更好 .

相关问题