根据How do we set maximum_bad_records when loading a Bigquery table from dataflow?,当从Dataflow将数据加载到BigQuery时,目前无法设置 maxBadRecords
配置 . 建议在将数据插入BigQuery之前验证Dataflow作业中的行 .
如果我有 TableSchema
和 TableRow
,我该如何确保可以安全地将行插入表中?
必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败 .
更新:
我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录 Cloud 存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery . 这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载) . 像时间戳这样的东西也需要格式化以与BigQuery一起使用 . 这个作业的一些变体会在不同的输入上运行并写入不同的表 .
从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery . 我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入 TableRow
对象 .
我的问题是数据混乱 . 有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见 . 有时,应包含字符串的属性包含整数,反之亦然 . 有时会有一个数组或一个应该有字符串的对象 .
理想情况下,我想把我的 TableRow
传递给 TableSchema
然后问"does this work?" .
因为这是不可能的,所以我做的是查看 TableSchema
对象并尝试自己验证/转换值 . 如果 TableSchema
表示属性类型为 STRING
,则在将其添加到 TableRow
之前运行 value.toString()
. 如果它是 INTEGER
我检查它是 Integer
, Long
或 BigInteger
,依此类推 . 这种方法的问题在于我只是猜测BigQuery会起什么作用 . FLOAT
会接受哪些Java数据类型?对于 TIMESTAMP
?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况 .
根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非 maxBadRecords
设置为足够大的数字) . 它也失败了表面有用的消息,如'BigQuery import job 1438818 failed. Causes: (5db0b2cdab1557e0): BigQuery job 1438819 in project 1438820 finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field' . 也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性, Value 是什么?没有这些信息,它也可以说"bad data" .
据我所知,至少在批处理模式下运行时,Dataflow会将 TableRow
对象写入 Cloud 存储中的暂存区域,然后一旦所有内容都启动加载 . 这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行 . 我不知道那里会有什么不同,从我(无可否认有限)理解基本原理是相同的,它更小 .
人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作 . 人们如何做到这一点?
1 回答
我假设您将文件中的JSON反序列化为
Map<String, Object>
. 然后你应该能够递归地使用TableSchema
进行类型检查 .我建议使用迭代方法开发模式验证,并执行以下两个步骤 .
PTransform<Map<String, Object>, TableRow>
,将您的JSON行转换为TableRow
个对象 .TableSchema
也应该是函数的构造函数参数 . 你可以开始使这个功能非常严格 - 要求JSON直接将输入解析为Integer,例如,当找到BigQuery INTEGER模式时 - 并积极地声明错误记录 . 基本上,确保在处理时超级严格不会输出无效记录 .我们的code here做了类似的事情 - 给定一个由BigQuery生成并作为JSON写入GCS的文件,我们递归地遍历模式并进行一些类型的转换 . 但是,我们不需要验证,因为BigQuery本身编写了数据 .
请注意
TableSchema
对象不是Serializable
. 我们已经将DoFn
或PTransform
构造函数中的TableSchema
转换为JSONString
并返回 . 见the code in BigQueryIO.java that uses the jsonTableSchema variable .Map<String, Object>
行并将其写入文件 . 这样,您可以检查稍后验证失败的行 .您可以从一些小文件开始并使用
DirectPipelineRunner
而不是DataflowPipelineRunner
. 直接运行程序在您的计算机上运行管道,而不是在Google Cloud Dataflow服务上运行,它使用BigQuery流式写入 . 我相信当这些写入失败时,您将获得更好的错误消息 .(我们对批处理作业使用GCS-> BigQuery负载作业模式,因为它更有效,更具成本效益,但BigQuery流在Streaming作业中写入,因为它们是低延迟的 . )
最后,在记录信息方面:
绝对检查Cloud Logging(按照日志面板上的
Worker Logs
链接) .如果运行bq command-line utility:
bq show -j PROJECT:dataflow_job_XXXXXXX
,您可能会获得有关批量数据流触发的加载作业失败原因的更好信息 .