首页 文章

在从Dataflow插入BigQuery之前验证行

提问于
浏览
5

根据How do we set maximum_bad_records when loading a Bigquery table from dataflow?,当从Dataflow将数据加载到BigQuery时,目前无法设置 maxBadRecords 配置 . 建议在将数据插入BigQuery之前验证Dataflow作业中的行 .

如果我有 TableSchemaTableRow ,我该如何确保可以安全地将行插入表中?

必须有一种更简单的方法来做到这一点,而不是迭代模式中的字段,查看它们的类型并查看行中值的类,对吧?这似乎容易出错,并且该方法必须是万无一失的,因为如果无法加载单行,整个管道就会失败 .

更新:

我的用例是一个ETL作业,最初将在JSON上运行(每行一个对象)登录 Cloud 存储并批量写入BigQuery,但稍后将从PubSub读取对象并连续写入BigQuery . 这些对象包含很多BigQuery中不需要的信息,还包含甚至无法在模式中描述的部分(基本上是自由形式的JSON有效负载) . 像时间戳这样的东西也需要格式化以与BigQuery一起使用 . 这个作业的一些变体会在不同的输入上运行并写入不同的表 .

从理论上讲,这不是一个非常困难的过程,它需要一个对象,提取一些属性(50-100),格式化其中一些并将对象输出到BigQuery . 我或多或少只是循环遍历属性名称列表,从源对象中提取值,查看配置以查看属性是否应该以某种方式格式化,如果需要应用格式(这可能是下行,划分毫秒时间戳) 1000,从URL中提取主机名等),并将值写入 TableRow 对象 .

我的问题是数据混乱 . 有几亿个物体有一些看起来并不像预期的那样,这种情况很少见,但是这些物品仍然很少见 . 有时,应包含字符串的属性包含整数,反之亦然 . 有时会有一个数组或一个应该有字符串的对象 .

理想情况下,我想把我的 TableRow 传递给 TableSchema 然后问"does this work?" .

因为这是不可能的,所以我做的是查看 TableSchema 对象并尝试自己验证/转换值 . 如果 TableSchema 表示属性类型为 STRING ,则在将其添加到 TableRow 之前运行 value.toString() . 如果它是 INTEGER 我检查它是 IntegerLongBigInteger ,依此类推 . 这种方法的问题在于我只是猜测BigQuery会起什么作用 . FLOAT 会接受哪些Java数据类型?对于 TIMESTAMP ?我认为我的验证/演员表可以解决大多数问题,但总有例外和边缘情况 .

根据我的经验,这是非常有限的,整个工作流程(工作?工作流程?不确定正确的术语)如果单行失败BigQuery的验证失败(就像常规加载一样,除非 maxBadRecords 设置为足够大的数字) . 它也失败了表面有用的消息,如'BigQuery import job 1438818 failed. Causes: (5db0b2cdab1557e0): BigQuery job 1438819 in project 1438820 finished with error(s): errorResult: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field, error: JSON map specified for non-record field' . 也许在哪里可以看到更详细的错误消息,可以告诉我它是哪个属性, Value 是什么?没有这些信息,它也可以说"bad data" .

据我所知,至少在批处理模式下运行时,Dataflow会将 TableRow 对象写入 Cloud 存储中的暂存区域,然后一旦所有内容都启动加载 . 这意味着我无处可捕获任何错误,我的代码在加载BigQuery时不再运行 . 我不知道那里会有什么不同,从我(无可否认有限)理解基本原理是相同的,它更小 .

人们使用Dataflow和BigQuery,因此,如果不必担心由于单个错误输入而导致整个管道停止,就不可能完成这项工作 . 人们如何做到这一点?

1 回答

  • 7

    我假设您将文件中的JSON反序列化为 Map<String, Object> . 然后你应该能够递归地使用 TableSchema 进行类型检查 .

    我建议使用迭代方法开发模式验证,并执行以下两个步骤 .

    • 编写一个 PTransform<Map<String, Object>, TableRow> ,将您的JSON行转换为 TableRow 个对象 . TableSchema 也应该是函数的构造函数参数 . 你可以开始使这个功能非常严格 - 要求JSON直接将输入解析为Integer,例如,当找到BigQuery INTEGER模式时 - 并积极地声明错误记录 . 基本上,确保在处理时超级严格不会输出无效记录 .

    我们的code here做了类似的事情 - 给定一个由BigQuery生成并作为JSON写入GCS的文件,我们递归地遍历模式并进行一些类型的转换 . 但是,我们不需要验证,因为BigQuery本身编写了数据 .

    请注意 TableSchema 对象不是 Serializable . 我们已经将 DoFnPTransform 构造函数中的 TableSchema 转换为JSON String 并返回 . 见the code in BigQueryIO.java that uses the jsonTableSchema variable .

    • 使用此blog post中描述的"dead-letter"策略处理错误记录 - 从PTransform输出有问题的 Map<String, Object> 行并将其写入文件 . 这样,您可以检查稍后验证失败的行 .

    您可以从一些小文件开始并使用 DirectPipelineRunner 而不是 DataflowPipelineRunner . 直接运行程序在您的计算机上运行管道,而不是在Google Cloud Dataflow服务上运行,它使用BigQuery流式写入 . 我相信当这些写入失败时,您将获得更好的错误消息 .

    (我们对批处理作业使用GCS-> BigQuery负载作业模式,因为它更有效,更具成本效益,但BigQuery流在Streaming作业中写入,因为它们是低延迟的 . )

    最后,在记录信息方面:

    • 绝对检查Cloud Logging(按照日志面板上的 Worker Logs 链接) .

    • 如果运行bq command-line utilitybq show -j PROJECT:dataflow_job_XXXXXXX ,您可能会获得有关批量数据流触发的加载作业失败原因的更好信息 .

相关问题