首页 文章

Apache Beam中的嵌套管道

提问于
浏览
1

我的Apache Beam管道获取无限的消息流 . 每条消息都会扇出N个元素(N为~1000,每个输入都不同) . 然后,对于前一阶段生成的每个元素,都有一个生成新N元素的映射操作,应该使用前1个操作来减少这些元素(元素按照从队列中读取的原始消息进行分组) . top 1的结果保存到外部存储器 . 在Spark中,我可以通过从流中读取消息并为每个执行map reduce的消息创建RDD来轻松完成 . 由于Apache Beam没有嵌套管道,我无法看到在Beam中使用无限流输入实现它的方法 . 例:

Infinite stream elements: A, B

Step 1 (fan out, N = 3): A -> A1, A2, A3
                (N = 2): B -> B1, B2

Step 2 (map): A1, A2, A3 -> A1', A2', A3'
              B1, B2, B3 -> B1', B2'

Step 3 (top1): A1', A2', A3' -> A2'
               B1', B2' -> B3'

Output: A2', B2'

A和B元素之间没有依赖关系 . A2'和B2'是其组的顶级元素 . 流是无限的 . Map 操作可能需要几秒钟到几分钟 . 创建窗口水印以获得执行 Map 操作所需的最长时间将使整个管道时间对于快速 Map 操作而言要慢得多 . 嵌套管道会有所帮助,因为这样我可以为每条消息创建一个管道 .

2 回答

  • 0

    看起来你不需要'嵌套管道' . 让我向您展示Beam Python SDK中的内容(它与Java类似):

    例如,尝试将数字和撇号附加到字符串的虚拟操作(例如 "A" => "A1'" ),您可以执行以下操作:

    def my_fn(value):
      def _inner(elm):
        return (elm, elm + str(value) + "'")  # A KV-pair
      return _inner
    
    # my_stream has [A, B]
    pcoll_1 = (my_stream
               | beam.Map(my_fn(1)))
    pcoll_2 = (my_stream
               | beam.Map(my_fn(2)))
    pcoll_3 = (my_stream
               | beam.Map(my_fn(3)))
    
    def top_1(elms):
      ... # Some operation
    
    result = ((pcoll_1, pcoll_2, pcoll_3)
              | beam.CoGroupByKey()
              | beam.Map(top_1))
    
  • -1

    所以这是一种有效的解决方案 . 我很可能正在编辑它,因为我在理解这个问题时可能会犯任何错误 . (P.s.模板代码在java中) . 假设 input 是您的流源

    PCollection<Messages> msgs = input.apply(Window.<Messages>into(        
                                        FixedWindows.of(Duration.standardSeconds(1)) 
                                                    .triggering(AfterWatermark.pastEndOfWindow()
                                             // fire the moment you see an element 
                                                       .withEarlyFirings(AfterPane.elementCountAtLeast(1))
                                             //optional since you have small window 
                                                       .withLateFirings(AfterProcessingTime.pastFirstElementInPane()))
                                                    .withAllowedLateness(Duration.standardMinutes(60))
                                                    .discardingFiredPanes());
    

    这将允许您读取 Messages 的流,该流可以是字符串或HashMap甚至是列表 . 观察到你正在告诉光束为它接收到的每个元素触发一个窗口,并且你设置了最大窗口为1秒 . 如果要每10条消息和一分钟的窗口等等,您可以更改此设置 .

    之后,您需要编写2个主要扩展DoFn的类

    PCollection<Element> top = msgs.apply(ParDo.of(new ExtractElements()))
                                   .apply(ParDo.of(new TopElement()));
    

    Element 可以是String,int,double等 .

    最后,您可以通过以下方式存储 each Element

    top.apply(ParDo.of(new ParsetoString()))
       .apply(TextIO.write().withWindowedWrites()
                            .withNumShards(1)
                            .to(filename));
    

    因此,对于每条可能很多的消息,您将有大约1个文件 . 但遗憾的是你无法附加档案 . 除非你做一个窗口,你将所有元素组合成一个列表并写入 .

    当然,有没有窗口的hacky方法,我将解释这个用例似乎不适合你(或如果你很好奇)

    如果我遗漏了什么,请告诉我! :)

相关问题