首页 文章

Apache Beam with Dataflow - 从BigQuery读取时为Nullpointer

提问于
浏览
2

我正在使用从BigQuery表和文件中读取的apache beam编写的google数据流运行 . 转换数据并将其写入其他BigQuery表 . 作业“通常”成功,但有时我从大查询表中读取并且我的作业失败时随机获取nullpointer异常:

(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:261)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:209)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:184)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplit(WorkerCustomSources.java:161)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSourceOperationExecutor.execute(WorkerCustomSourceOperationExecutor.java:47)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:341)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:297)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我无法弄清楚这与之相关 . 当我清除临时目录并重新上载我的模板时,作业再次通过 .

我从BQ读取的方式只是:

BigQueryIO.read().fromQuery()

我非常感谢任何帮助 .

任何人?

3 回答

  • 2

    我最终在google issuetracker中添加了bug . 经过与谷歌员工的长时间对话以及他们的调查后发现,使用从BigQuery读取的数据流批处理作业的模板是没有意义的,因为您只能执行一次 .

    引用:“对于BigQuery批处理管道,模板只能执行一次,因为BigQuery作业ID是在模板创建时设置的 . 这个限制将在SDK 2的未来版本中删除,但是当我不能说时 . 创建模板:https://cloud.google.com/dataflow/docs/templates/creating-templates#pipeline-io-and-runtime-parameters

    如果错误比NullpointerException更清楚,那仍然会很好 .

    无论如何,我希望将来帮助某人 .

    如果有人对整个对话感兴趣,这就是问题所在:https://issuetracker.google.com/issues/63124894

  • 3

    我也遇到了这个问题,经过深入研究后发现版本2.2.0中的限制已被删除 . 但是,它还没有正式发布 . 您可以在JIRA project上查看此版本的进度(似乎只剩下一个问题) .

    但是如果你现在想要使用它,你可以自己编译,这并不困难 . 只需从他们的github mirror,checkout标签 v2.2.0-RC4 签出源代码,然后运行 mvn clean install . 然后只需修改 pom.xml 中的项目依赖项,即指向版本 2.2.0 .

    从2.2.0开始,如果要使用 BigQueryIO 作为模板,则需要调用 withTemplateCompatibility()

    BigQueryIO
        .readTableRows() // read() has been deprecated in 2.2.0
        .withTemplateCompatibility() // You need to add this
        .fromQuery(options.getInputQuery())
    

    我目前正在为我的项目使用2.2.0,到目前为止工作正常 .

  • 1

    好的,让我提供一些细节 .

    • 作业上传为模板并在Google数据流上运行

    • 工作通常成功 - 这就是我怀疑实际代码有问题的原因 . 异常来自源代码,它看起来像: bqServices.getDatasetService(bqOptions) 在BigQuerySourceBase中返回null

    • 是的我确实提供了实际的查询

    以下是我工作的DAG . 正如您所看到的,这次运行成功了 . 它处理了从BQ导出的超过2百万行,从csv文件导出了1.5百万行,并将800k写回BigQuery(数字是正确的) . 这项工作基本上按预期工作(当它工作时) . 左上角(读取事务)是在BQ上执行查询的步骤 . 而且有一步没有理由就失败了 .

    Successful run - Beam DAG

    当BQ源上的Nullpointer失败时,下面是相同的工作 .

    Failed run - Beam DAG

    我不确定在这种情况下代码片段会有多大帮助,但这是执行查询的一部分:

    PCollection<Transaction> transactions = p.apply("Read Transactions", BigQueryIO.read().fromQuery(createTransactionQuery(options)))
                                            .apply("Map to Transaction", MapElements.via(new TableRowToTransactionFn()));
    
        PCollection<KV<String, Transaction>> transactionsPerMtn = 
                transactions.apply("Filter Transactions Without MTN", Filter.by(t -> t.transactionMtn != null))
                            .apply("Map Transactions to MTN key", MapElements.into(
                        TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(Transaction.class)))
                                        .via(t -> KV.of(t.transactionMtn, t)));
    

    并在获取查询的方法下面:

    private ValueProvider<String> createTransactionQuery(TmsPipelineOptions options) {
        return NestedValueProvider.of(options.getInputTransactionTable(), table -> {
            StringBuilder sb = new StringBuilder();
            sb.append(
                    "SELECT transaction_id, transaction_mtn, transaction_folio_number, transaction_payer_folio_number FROM ");
            sb.append(table);
            return sb.toString();
        });
    }
    

    我相信大查询源中存在某种错误,导致类似的问题 . 我无法确定造成这种情况的原因,因为它是随机发生的 . 就像我写的那样,上次我遇到它时,我只是清除了gcs上的临时目录并重新上传了我的模板(没有任何代码更改)并且工作又开始工作了 .

相关问题