首页 文章

如何将CSV文件导入BigQuery表而不使用任何列名或模式?

提问于
浏览
11

我目前正在编写一个Java实用程序,用于将几个CSV文件从GCS导入BigQuery . 我可以通过 bq load 轻松实现这一点,但我想使用Dataflow作业来实现 . 所以我'm using Dataflow'的Pipeline和ParDo转换器(返回TableRow将它应用于BigQueryIO)我已经为转换创建了StringToRowConverter() . 这里实际问题开始了 - 我被迫为目标表指定模式,尽管我不存在 - 只是尝试加载数据 . 所以我不想手动设置TableRow的列名,因为我有大约600列 .

public class StringToRowConverter extends DoFn<String, TableRow> {

private static Logger logger = LoggerFactory.getLogger(StringToRowConverter.class);

public void processElement(ProcessContext c) {
    TableRow row = new TableRow();          
    row.set("DO NOT KNOW THE COLUMN NAME", c.element());
    c.output(row);
}
}

此外,假设该表已存在于BigQuery数据集中,我不需要创建它,并且CSV文件也包含正确顺序的列 .

如果此方案没有解决方法,并且数据加载需要列名,那么我可以将其放在CSV文件的第一行中 .

任何帮助将不胜感激 .

1 回答

  • 6

    为避免创建表,应在管道配置期间使用BigQueryIO.Write的BigQueryIO.Write.CreateDisposition.CREATE_NEVER . 资料来源:https://cloud.google.com/dataflow/java-sdk/JavaDoc/com/google/cloud/dataflow/sdk/io/BigQueryIO.Write

    您不需要事先了解BigQuery表模式,您可以动态地发现它 . 例如,您可以使用BigQuery API(https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/get)查询表模式,并将其作为StringToRowConverter类的参数传递 . 另一个选项并假设第一行是 Headers ,是跳过第一行并使用它来正确映射文件的其余部分 .

    下面的代码实现了第二种方法,并将输出配置为附加到现有的BigQuery表 .

    public class DFJob {
    
        public static class StringToRowConverter extends DoFn<String, TableRow> {
    
            private String[] columnNames;
    
            private boolean isFirstRow = true;
    
            public void processElement(ProcessContext c) {
                TableRow row = new TableRow();
    
                String[] parts = c.element().split(",");
    
                if (isFirstRow) {
                    columnNames = Arrays.copyOf(parts, parts.length);
                    isFirstRow = false;
                } else {
                    for (int i = 0; i < parts.length; i++) {
                        row.set(columnNames[i], parts[i]);
                    }
                    c.output(row);
                }
            }
        }
    
        public static void main(String[] args) {
            DataflowPipelineOptions options = PipelineOptionsFactory.create()
                    .as(DataflowPipelineOptions.class);
            options.setRunner(BlockingDataflowPipelineRunner.class);
    
            Pipeline p = Pipeline.create(options);
    
            p.apply(TextIO.Read.from("gs://dataflow-samples/myfile.csv"))
                    .apply(ParDo.of(new StringToRowConverter()))
                    .apply(BigQueryIO.Write.to("myTable")
                            .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
                            .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    
            PipelineResult result = p.run();
        }
    }
    

相关问题