我的Apache Beam管道获取无限的消息流 . 每条消息都会扇出N个元素(N为~1000,每个输入都不同) . 然后,对于前一阶段生成的每个元素,都有一个生成新N元素的映射操作,应该使用前1个操作来减少这些元素(元素按照从队列中读取的原始消息进行分组) . top 1的结果保存到外部存储器 . 在Spark中,我可以通过从流中读取消息并为每个执行map reduce的消息创建RDD来轻松完成 . 由于Apache Beam没有嵌套管道,我无法看到在Beam中使用无限流输入实现它的方法 . 例:
Infinite stream elements: A, B
Step 1 (fan out, N = 3): A -> A1, A2, A3
(N = 2): B -> B1, B2
Step 2 (map): A1, A2, A3 -> A1', A2', A3'
B1, B2, B3 -> B1', B2'
Step 3 (top1): A1', A2', A3' -> A2'
B1', B2' -> B3'
Output: A2', B2'
A和B元素之间没有依赖关系 . A2'和B2'是其组的顶级元素 . 流是无限的 . Map 操作可能需要几秒钟到几分钟 . 创建窗口水印以获得执行 Map 操作所需的最长时间将使整个管道时间对于快速 Map 操作而言要慢得多 . 嵌套管道会有所帮助,因为这样我可以为每条消息创建一个管道 .
2 回答
看起来你不需要'嵌套管道' . 让我向您展示Beam Python SDK中的内容(它与Java类似):
例如,尝试将数字和撇号附加到字符串的虚拟操作(例如
"A"
=>"A1'"
),您可以执行以下操作:所以这是一种有效的解决方案 . 我很可能正在编辑它,因为我在理解这个问题时可能会犯任何错误 . (P.s.模板代码在java中) . 假设
input
是您的流源这将允许您读取
Messages
的流,该流可以是字符串或HashMap甚至是列表 . 观察到你正在告诉光束为它接收到的每个元素触发一个窗口,并且你设置了最大窗口为1秒 . 如果要每10条消息和一分钟的窗口等等,您可以更改此设置 .之后,您需要编写2个主要扩展DoFn的类
Element
可以是String,int,double等 .最后,您可以通过以下方式存储 each
Element
:因此,对于每条可能很多的消息,您将有大约1个文件 . 但遗憾的是你无法附加档案 . 除非你做一个窗口,你将所有元素组合成一个列表并写入 .
当然,有没有窗口的hacky方法,我将解释这个用例似乎不适合你(或如果你很好奇)
如果我遗漏了什么,请告诉我! :)