首页 文章

RxJs分组热观察的笛卡尔积

提问于
浏览
0

Motivation:

目标是编写一个允许提取关系/规范化数据的抓取库 .

为此,有:

  • source streams:发出cheerio文档:通过抓取网址或通过查找父级cheerio文档(例如cheerio文档 - >多个cheerio文档,父级中的每个li文件一个)

  • 表流:发出表行,订阅源流组合并从给定的cheerio文档中提取数据

由于复杂数据需要多个表,因此每个源流可能多次订阅并需要共享,因为它包含副作用(爬行)

Example: Blog

在一个简单的博客上可能有帖子,作者和类别;基础数据结构例如是

Post: {id, author, title, text}
Author: {id, name}
Category: {id, name}
author_post: {author_id, post_id}
post_category: {post_id, category_id}

要从已删除的html重新创建数据结构,我们创建了三个源流:

  • 帖子:fed post urls,返回cheerio文件的帖子

  • post.author:child to posts,跟随帖子作者的超链接并发出作者的cheerio文档

  • post.category:child to posts,为帖子上列出的每个类别返回一个cheerio文档(例如'.categories li')

要重新创建post_category表,每个帖子必须与属于该帖子的每个类别(== carthesian产品)组合 .

我的实际问题更加人为,因为儿童流已经发出了他们自己的cheerio文件以及他们每个父母的文件,即{post:cheerio,author:cheerio} .

组合流的问题仅出现在例如兄弟姐妹 .

我也无法通过在一个流中发布父节点的所有子节点来避免分组问题(例如{post,author,category}),因为更复杂的数据结构需要祖父母进行分组

(如果需要的话,我可以提供一个例子,但这已经足够长了) .

Problem

使用groupBy和zip来组合热观察组是不可能的 .

groupBy发出的GroupObservable一旦创建就会发出它们的值,而zip等待所有压缩的observable发出GroupObservables意味着缺少在zip函数运行之前发出的任何值 .

Question

如何在不丢失值的情况下对热的可观察对象进行分组和压缩?由于可能异步解析(爬行)子节点,因此无法依赖时间信息(例如,在下一个父节点发出之前发出的所有子节点) .

More information:

我能想象到的最好的是:

Child1和Child2是父级的映射版本,C1C2是父级对Child1和Child2进行分组并计算这些组内的笛卡尔积的结果 .

Parent: -1------2-------3--------

Child1: --a------b--c------------

Child2: ---1-2----3---4----------

C1C2:   --a1-a2---b3-c3-b4-c4----

形成笛卡尔积本身就没有问题,因为有一个简单的实现(取自RxJ上的问题#807,无法发布更多链接)

function xprod (o1, o2) {
  return o1.concatMap(x => o2, (x, y) => [x, y]);
};

问题不是缺少任何值 .

Editjsbin显示一个简单的情况:1个父母,2个孩子 .

References

2 回答

  • 0

    我在这里添加了另一个答案,我对你的问题有了新的认识 . 如果我误解了,我会在以后删除它 .

    我的理解是:

    • 有一个父observable生成值

    • child1使用这些值来生成另一个observable,它表示来自异步操作的值序列($ .get或其他)

    • 与child2相同

    • 父的一个值因此生成两个可观察量,并且您想要这两个可观察量的笛卡尔积 .

    该代码应该:

    var parent = Rx.Observable.interval(500)
        .take(3)
        .publish();
    var Obs3Val = function (x, delay) {
      return Rx.Observable.interval(delay)
        .map(function(y){return "P"+x+":"+"C" + y;})
        .take(3);
    };
    function emits(who){
      return function (x) {console.log(who + " emits " + x);};
    }
    
    var child1 = parent.map(x => Obs3Val(x, 200));
    var child2 = parent.map(x => Obs3Val(x, 100));
    
    Rx.Observable.zip(child1, child2)
                 .concatMap(function(zipped){
      var o1 = zipped[0], o2 = zipped[1];
    o1.subscribe(emits("child1"));
    o2.subscribe(emits("child2"));
      return o1.concatMap(x => o2, (x, y) => [x, y]);
    })
        .subscribe(function(v) {
                     console.log('cartesian product : ' +v);
                   });
    
    parent.subscribe();
    
    parent.connect();
    

    jsbin:https://jsbin.com/revurohoce/1/edit?js,consolehttps://jsbin.com/wegayacede/edit?js,console

    记录:

    "child2 emits P0:C0"
    "child1 emits P0:C0"
    "child2 emits P0:C1"
    "cartesian product : P0:C0,P0:C0"
    "child2 emits P0:C2"
    "child1 emits P0:C1"
    "cartesian product : P0:C0,P0:C1"
    "cartesian product : P0:C0,P0:C2"
    "child2 emits P1:C0"
    "child1 emits P0:C2"
    "cartesian product : P0:C1,P0:C0"
    "child1 emits P1:C0"
    "child2 emits P1:C1"
    "cartesian product : P0:C1,P0:C1"
    "child2 emits P1:C2"
    "cartesian product : P0:C1,P0:C2"
    "child1 emits P1:C1"
    "cartesian product : P0:C2,P0:C0"
    "cartesian product : P0:C2,P0:C1"
    "child1 emits P1:C2"
    "child2 emits P2:C0"
    "cartesian product : P0:C2,P0:C2"
    "child1 emits P2:C0"
    "child2 emits P2:C1"
    "child2 emits P2:C2"
    "child1 emits P2:C1"
    "cartesian product : P1:C0,P1:C0"
    "cartesian product : P1:C0,P1:C1"
    "child1 emits P2:C2"
    "cartesian product : P1:C0,P1:C2"
    "cartesian product : P1:C1,P1:C0"
    "cartesian product : P1:C1,P1:C1"
    "cartesian product : P1:C1,P1:C2"
    "cartesian product : P1:C2,P1:C0"
    "cartesian product : P1:C2,P1:C1"
    "cartesian product : P1:C2,P1:C2"
    "cartesian product : P2:C0,P2:C0"
    "cartesian product : P2:C0,P2:C1"
    "cartesian product : P2:C0,P2:C2"
    "cartesian product : P2:C1,P2:C0"
    "cartesian product : P2:C1,P2:C1"
    "cartesian product : P2:C1,P2:C2"
    "cartesian product : P2:C2,P2:C0"
    "cartesian product : P2:C2,P2:C1"
    "cartesian product : P2:C2,P2:C2"
    
  • 0

    我想出的解决方案是返回ReplaySubjects的分组操作符的简单实现 . 虽然解决方案特定于我的要求,但一般要点可能会有所帮助:

    Observable.prototype.splitBy = function(keySelector) {
      const parentObservable = this;
      let group, lastKey;
      return Observable.create(observable => {
        return parentObservable.subscribe(
          value => {
            const currentKey = keySelector(value);
            if(currentKey === lastKey) {
              group.next(value);
            } else {
              if(group) group.complete();
              group = new ReplaySubject();
              observable.next(group);
              group.key = currentKey;
              group.next(value);
            }
            lastKey = currentKey;
          },
          error => observable.error(error),
          completed => {
            group.complete();
            observable.complete();
          });
      });
    };
    

相关问题