我有一个Dataflow管道从PubSub主题读取事件数据 . 收到消息时,我会执行转换步骤,以使事件数据适合我想要的BigQuery架构 . 但是,如果我创建的输入不适合模式,我会遇到问题 . 显然,写入BigQuery是无限重试的:
Count: 76 RuntimeError: Could not successfully insert rows to BigQuery table
目前我正在进行大量的手动检查,输入确实符合架构,但是,在我没有考虑的情况下,我会积累RuntimeErrors . 有没有办法尝试写入BigQuery,以防万一用原始输入执行其他操作失败?或者,有没有办法尝试多次写入,否则无需添加新的RuntimeErrors就会无声地失败?
Edit: 我正在使用python SDK . 以下是我的简化管道以进一步说明:
with beam.Pipeline(options=options) as pipeline:
# Read messages from PubSub
event = (pipeline
| 'Read from PubSub' >> beam.io.gcp.pubsub.ReadStringsFromPubSub(topic))
output = (event
| 'Create output' >> beam.transforms.core.FlatMap(lambda event: [{'input': event}]))
# Write to Big Query
_ = (output
| 'Write log to BigQuery' >> beam.io.gcp.bigquery.WriteToBigQuery(
table=table,
dataset=dataset,
project=project,
schema=schema,
create_disposition=beam.io.gcp.bigquery.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.gcp.bigquery.BigQueryDisposition.WRITE_APPEND))
如果我的表中没有列'input',则作业将死亡 . 看了https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1279之后,这就是这种行为的原因 . 通过自定义https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/gcp/bigquery.py#L1187而不是引发RuntimeError,我可以克服我的问题,但是,这感觉非常麻烦 . 有人建议采用更简单的方法吗?
2 回答
如果你自己编写了管道,你应该可以在BigQueryIO上使用setFailedInsertRetryPolicy来InsertRetryPolicy.neverRetry
用于流式传输的Beam - Python SDK非常有限 .
https://beam.apache.org/documentation/sdks/python-streaming/
Python流执行当前不支持以下功能 .
General Beam features :这些不受支持的Beam功能适用于所有跑步者 .
State和Timers API
自定义源API
Splittable DoFn API
处理后期数据
用户定义的自定义WindowFn
DataflowRunner specific features :此外,DataflowRunner目前不支持Python流执行的以下Cloud Dataflow特定功能 .
流式自动缩放
更新现有管道
Cloud 数据流模板
某些监视功能,例如msec计数器,显示数据,度量标准和变换的元素计数 . 但是,支持源的日志记录,水印和元素计数 .
更多信息:https://beam.apache.org/documentation/sdks/python-streaming/#unsupported-features
另请查看DataFlow文档中的以下发行说明: