首页 文章

Cloud 数据流自定义模板创建问题

提问于
浏览
1

我正在尝试为 Cloud 数据流作业创建一个模板,该模板从 Cloud 存储中读取json文件并写入Big Query . 我传递了2个运行时参数:1 . 用于GCS位置的InputFile 2.数据集和BigQuery的表ID .

JsonTextToBqTemplate代码:

public class JsonTextToBqTemplate {

    private static final Logger logger = 
    LoggerFactory.getLogger(TextToBQTemplate.class);

    private static Gson gson = new GsonBuilder().create();

    public static void main(String[] args) throws Exception {

        JsonToBQTemplateOptions options = 
        PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(JsonToBQTemplateOptions.class);

        String jobName = options.getJobName();

        try {
            logger.info("PIPELINE-INFO: jobName={} message={} ", 
            jobName, "starting pipeline creation");
            Pipeline pipeline = Pipeline.create(options);
            pipeline.apply("ReadLines", TextIO.read().from(options.getInputFile()))

                    .apply("Converting to TableRows", ParDo.of(new DoFn<String, TableRow>() {
                        private static final long serialVersionUID = 0;

                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            String json = c.element();
                            TableRow tableRow = gson.fromJson(json, TableRow.class);
                            c.output(tableRow);
                        }
                    }))
            .apply(BigQueryIO.writeTableRows().to(options.getTableSpec())
                    .withCreateDisposition(CreateDisposition.CREATE_NEVER)
                    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

            logger.info("PIPELINE-INFO: jobName={} message={} ", jobName, "pipeline started");
            State state = pipeline.run().waitUntilFinish();
            logger.info("PIPELINE-INFO: jobName={} message={} ", jobName, "pipeline status" + state);

        } catch (Exception exception) {
            throw exception;
        }
    }
 }

选项代码:

public interface JsonToBQTemplateOptions extends PipelineOptions{

    ValueProvider<String> getInputFile();

    void setInputFile(ValueProvider<String> value);

    ValueProvider<String> getErrorOutput();

    void setErrorOutput(ValueProvider<String> value);

    ValueProvider<String> getTableSpec();

    void setTableSpec(ValueProvider<String> value);

  }

Maven命令创建模板:

mvn -X compile exec:java \
    -Dexec.mainClass=com.xyz.adp.pipeline.template.JsonTextToBqTemplate \
    -Dexec.args="--project=xxxxxx-12356 \
    --stagingLocation=gs://xxx-test/template/staging/jsontobq/ \
    --tempLocation=gs://xxx-test/temp/ \
    --templateLocation=gs://xxx-test/template/templates/jsontobq \
    --errorOutput=gs://xxx-test/template/output"

错误:

Caused by: java.lang.IllegalStateException: Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}. [RuntimeValueProvider{propertyName=inputFile, default=null, value=null}]
at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:199)
at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:207)
at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:87)
at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:62)

当我传递inputFile和tableSpec的值时,Mvn Build成功了 .

mvn -X compile exec:java \
-Dexec.mainClass=com.ihm.adp.pipeline.template.JsonTextToBqTemplate \
-Dexec.args="--project=xxxxxx-123456 \
--stagingLocation=gs://xxx-test/template/staging/jsontobq/ \
--tempLocation=gs://xxx-test/temp/ \
--templateLocation=gs://xxx-test/template/templates/jsontobq \
--inputFile=gs://xxx-test/input/bqtest.json \
--tableSpec=xxx_test.jsontobq_test \
--errorOutput=gs://xxx-test/template/output"

但它不会在 Cloud 数据流中创建任何模板 .

有没有办法在maven执行期间创建模板而不验证这些运行时参数?

2 回答

  • 1

    我认为这里的问题是你没有指定跑步者 . 默认情况下,这是尝试使用DirectRunner . 试着通过

    --runner=TemplatingDataflowPipelineRunner
    

    作为 -Dexec.args 的一部分 . 在此之后,您还不需要指定ValueProvider模板参数,如inputFile等 .

    更多信息:

    https://cloud.google.com/dataflow/docs/templates/creating-templates

  • 1

    如果您使用的是Dataflow SDK版本1.x,则需要指定以下参数:

    --runner=TemplatingDataflowPipelineRunner
    --dataflowJobFile=gs://xxx-test/template/templates/jsontobq/
    

    如果您使用的是Dataflow SDK版本2.x(Apache Beam),则需要指定以下参数:

    --runner=DataflowRunner
    --templateLocation=gs://xxx-test/template/templates/jsontobq/
    

    看起来您正在使用Dataflow SDK版本2.x而不是为 runner 参数指定 DataflowRunner .

    参考:https://cloud.google.com/dataflow/docs/templates/creating-templates

相关问题