我正在尝试为 Cloud 数据流作业创建一个模板,该模板从 Cloud 存储中读取json文件并写入Big Query . 我传递了2个运行时参数:1 . 用于GCS位置的InputFile 2.数据集和BigQuery的表ID .
JsonTextToBqTemplate代码:
public class JsonTextToBqTemplate {
private static final Logger logger =
LoggerFactory.getLogger(TextToBQTemplate.class);
private static Gson gson = new GsonBuilder().create();
public static void main(String[] args) throws Exception {
JsonToBQTemplateOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation()
.as(JsonToBQTemplateOptions.class);
String jobName = options.getJobName();
try {
logger.info("PIPELINE-INFO: jobName={} message={} ",
jobName, "starting pipeline creation");
Pipeline pipeline = Pipeline.create(options);
pipeline.apply("ReadLines", TextIO.read().from(options.getInputFile()))
.apply("Converting to TableRows", ParDo.of(new DoFn<String, TableRow>() {
private static final long serialVersionUID = 0;
@ProcessElement
public void processElement(ProcessContext c) {
String json = c.element();
TableRow tableRow = gson.fromJson(json, TableRow.class);
c.output(tableRow);
}
}))
.apply(BigQueryIO.writeTableRows().to(options.getTableSpec())
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
logger.info("PIPELINE-INFO: jobName={} message={} ", jobName, "pipeline started");
State state = pipeline.run().waitUntilFinish();
logger.info("PIPELINE-INFO: jobName={} message={} ", jobName, "pipeline status" + state);
} catch (Exception exception) {
throw exception;
}
}
}
选项代码:
public interface JsonToBQTemplateOptions extends PipelineOptions{
ValueProvider<String> getInputFile();
void setInputFile(ValueProvider<String> value);
ValueProvider<String> getErrorOutput();
void setErrorOutput(ValueProvider<String> value);
ValueProvider<String> getTableSpec();
void setTableSpec(ValueProvider<String> value);
}
Maven命令创建模板:
mvn -X compile exec:java \
-Dexec.mainClass=com.xyz.adp.pipeline.template.JsonTextToBqTemplate \
-Dexec.args="--project=xxxxxx-12356 \
--stagingLocation=gs://xxx-test/template/staging/jsontobq/ \
--tempLocation=gs://xxx-test/temp/ \
--templateLocation=gs://xxx-test/template/templates/jsontobq \
--errorOutput=gs://xxx-test/template/output"
错误:
Caused by: java.lang.IllegalStateException: Cannot estimate size of a FileBasedSource with inaccessible file pattern: {}. [RuntimeValueProvider{propertyName=inputFile, default=null, value=null}]
at org.apache.beam.sdk.repackaged.com.google.common.base.Preconditions.checkState(Preconditions.java:518)
at org.apache.beam.sdk.io.FileBasedSource.getEstimatedSizeBytes(FileBasedSource.java:199)
at org.apache.beam.runners.direct.BoundedReadEvaluatorFactory$InputProvider.getInitialInputs(BoundedReadEvaluatorFactory.java:207)
at org.apache.beam.runners.direct.ReadEvaluatorFactory$InputProvider.getInitialInputs(ReadEvaluatorFactory.java:87)
at org.apache.beam.runners.direct.RootProviderRegistry.getInitialInputs(RootProviderRegistry.java:62)
当我传递inputFile和tableSpec的值时,Mvn Build成功了 .
mvn -X compile exec:java \
-Dexec.mainClass=com.ihm.adp.pipeline.template.JsonTextToBqTemplate \
-Dexec.args="--project=xxxxxx-123456 \
--stagingLocation=gs://xxx-test/template/staging/jsontobq/ \
--tempLocation=gs://xxx-test/temp/ \
--templateLocation=gs://xxx-test/template/templates/jsontobq \
--inputFile=gs://xxx-test/input/bqtest.json \
--tableSpec=xxx_test.jsontobq_test \
--errorOutput=gs://xxx-test/template/output"
但它不会在 Cloud 数据流中创建任何模板 .
有没有办法在maven执行期间创建模板而不验证这些运行时参数?
2 回答
我认为这里的问题是你没有指定跑步者 . 默认情况下,这是尝试使用DirectRunner . 试着通过
作为
-Dexec.args
的一部分 . 在此之后,您还不需要指定ValueProvider模板参数,如inputFile等 .更多信息:
https://cloud.google.com/dataflow/docs/templates/creating-templates
如果您使用的是Dataflow SDK版本1.x,则需要指定以下参数:
如果您使用的是Dataflow SDK版本2.x(Apache Beam),则需要指定以下参数:
看起来您正在使用Dataflow SDK版本2.x而不是为
runner
参数指定DataflowRunner
.参考:https://cloud.google.com/dataflow/docs/templates/creating-templates