首页 文章

Python中的Apache Beam:如何触发空窗口的警报

提问于
浏览
0

我在用Python编写的Apache Beam管道中处理分析命中 . 我正在使用10分钟的FixedWindows,我想在窗口为空时触发警报(例如使用Cloud Pub / Sub) . 到目前为止,这就是我所做的:

ten_min_windows = day_hits | '10MinutesWindows' >> beam.WindowInto(
    beam.window.FixedWindows(10 * 60))

ten_min_alerts = (ten_min_windows
    | 'CountTransactions10Min' >> beam.CombineGlobally(count_transactions).without_defaults()
    | 'KeepZeros10Min' >> beam.Filter(keep_zeros)
    | 'ConvertToAlerts10Min' >> beam.ParDo(ToAlert()))

count_transactions过滤器仅保留事务命中,然后返回结果列表的长度 . 如果结果长度为0,则keep_zeros返回true . 问题是,如果PCollection不包含事务命中,则根本不返回任何长度,并且由于没有默认值,我得到一个空的PCollection . 我似乎无法在没有默认值的情况下取出,因为在使用非全局窗口时不允许这样做 .

我已经看到thread建议在每个窗口添加一个虚拟元素,然后检查计数是否多于一个 .

这是最好的解决方案还是有更好的方法?

我怎么能这样做,因为我需要每个窗口只有一个元素?我可以直接在管道中对此进行编码,还是需要每10分钟安排一次假命中(例如通过Cloud Pub / Sub)?

1 回答

  • 1

    例如,您可以使用 Metrics.counter 来监视在Stackdriver中处理的元素数量 .

    然后,您可以根据自己的规则从您最喜爱的监控工具中设置警报 .

相关问题