首页 文章

在Dataflow管道中写入BigQuery时捕获失败

提问于
浏览
0

我有一个Dataflow管道从PubSub主题读取事件数据 . 收到消息时,我会执行转换步骤,以使事件数据适合我想要的BigQuery架构 . 但是,如果我创建的输入不适合模式,我会遇到问题 . 显然,写入BigQuery是无限重试的:

Count: 76   RuntimeError: Could not successfully insert rows to BigQuery table

目前我正在进行大量的手动检查,输入确实符合架构,但是,在我没有考虑的情况下,我会积累RuntimeErrors . 有没有办法尝试写入BigQuery,以防万一用原始输入执行其他操作失败?或者,有没有办法尝试多次写入,否则无需添加新的RuntimeErrors就会无声地失败?

Edit: 我正在使用python SDK . 以下是我的简化管道以进一步说明:

with beam.Pipeline(options=options) as pipeline:
    # Read messages from PubSub
    event = (pipeline
             | 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))

    output = (event
              | 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))

    # Write to Big Query
    _ = (output
         | 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
             table=table,
             dataset=dataset,
             project=project,
             schema=schema,
             create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
             write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))

如果我的表中没有列'input',则作业将死亡 . 看了https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279之后,这就是这种行为的原因 . 通过自定义https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187而不是引发RuntimeError,我可以克服我的问题,但是,这感觉非常麻烦 . 有人建议采用更简单的方法吗?

2 回答

  • 0

    如果你自己编写了管道,你应该可以在BigQueryIO上使用setFailedInsertRetryPolicyInsertRetryPolicy.neverRetry

  • 0

    用于流式传输的Beam - Python SDK非常有限 .

    https://beam.apache.org/documentation/sdks/python-streaming/

    从Python SDK版本2.5.0开始,Python流管道执行是实验可用的(有一些限制) .

    Python流执行当前不支持以下功能 .

    General Beam features :这些不受支持的Beam功能适用于所有跑步者 .

    • State和Timers API

    • 自定义源API

    • Splittable DoFn API

    • 处理后期数据

    • 用户定义的自定义WindowFn

    DataflowRunner specific features :此外,DataflowRunner目前不支持Python流执行的以下Cloud Dataflow特定功能 .

    • 流式自动缩放

    • 更新现有管道

    • Cloud 数据流模板

    • 某些监视功能,例如msec计数器,显示数据,度量标准和变换的元素计数 . 但是,支持源的日志记录,水印和元素计数 .

    更多信息:https://beam.apache.org/documentation/sdks/python-streaming/#unsupported-features

    另请查看DataFlow文档中的以下发行说明:
    Python Dataflow Streaming limitation

相关问题