Motivation:
目标是编写一个允许提取关系/规范化数据的抓取库 .
为此,有:
-
source streams:发出cheerio文档:通过抓取网址或通过查找父级cheerio文档(例如cheerio文档 - >多个cheerio文档,父级中的每个li文件一个)
-
表流:发出表行,订阅源流组合并从给定的cheerio文档中提取数据
由于复杂数据需要多个表,因此每个源流可能多次订阅并需要共享,因为它包含副作用(爬行)
Example: Blog
在一个简单的博客上可能有帖子,作者和类别;基础数据结构例如是
Post: {id, author, title, text}
Author: {id, name}
Category: {id, name}
author_post: {author_id, post_id}
post_category: {post_id, category_id}
要从已删除的html重新创建数据结构,我们创建了三个源流:
-
帖子:fed post urls,返回cheerio文件的帖子
-
post.author:child to posts,跟随帖子作者的超链接并发出作者的cheerio文档
-
post.category:child to posts,为帖子上列出的每个类别返回一个cheerio文档(例如'.categories li')
要重新创建post_category表,每个帖子必须与属于该帖子的每个类别(== carthesian产品)组合 .
我的实际问题更加人为,因为儿童流已经发出了他们自己的cheerio文件以及他们每个父母的文件,即{post:cheerio,author:cheerio} .
组合流的问题仅出现在例如兄弟姐妹 .
我也无法通过在一个流中发布父节点的所有子节点来避免分组问题(例如{post,author,category}),因为更复杂的数据结构需要祖父母进行分组
(如果需要的话,我可以提供一个例子,但这已经足够长了) .
Problem :
使用groupBy和zip来组合热观察组是不可能的 .
groupBy发出的GroupObservable一旦创建就会发出它们的值,而zip等待所有压缩的observable发出GroupObservables意味着缺少在zip函数运行之前发出的任何值 .
Question :
如何在不丢失值的情况下对热的可观察对象进行分组和压缩?由于可能异步解析(爬行)子节点,因此无法依赖时间信息(例如,在下一个父节点发出之前发出的所有子节点) .
More information:
我能想象到的最好的是:
Child1和Child2是父级的映射版本,C1C2是父级对Child1和Child2进行分组并计算这些组内的笛卡尔积的结果 .
Parent: -1------2-------3--------
Child1: --a------b--c------------
Child2: ---1-2----3---4----------
C1C2: --a1-a2---b3-c3-b4-c4----
形成笛卡尔积本身就没有问题,因为有一个简单的实现(取自RxJ上的问题#807,无法发布更多链接)
function xprod (o1, o2) {
return o1.concatMap(x => o2, (x, y) => [x, y]);
};
问题不是缺少任何值 .
Edit :jsbin显示一个简单的情况:1个父母,2个孩子 .
References :
- question on RxJs Github //删除了其他链接,因为我只能发布2
2 回答
我在这里添加了另一个答案,我对你的问题有了新的认识 . 如果我误解了,我会在以后删除它 .
我的理解是:
有一个父observable生成值
child1使用这些值来生成另一个observable,它表示来自异步操作的值序列($ .get或其他)
与child2相同
父的一个值因此生成两个可观察量,并且您想要这两个可观察量的笛卡尔积 .
该代码应该:
jsbin:https://jsbin.com/revurohoce/1/edit?js,console,https://jsbin.com/wegayacede/edit?js,console
记录:
我想出的解决方案是返回ReplaySubjects的分组操作符的简单实现 . 虽然解决方案特定于我的要求,但一般要点可能会有所帮助: