首页 文章

在不使用Google Cloud 端存储的情况下将BigQuery数据导出为CSV

提问于
浏览
7

我目前正在编写一个软件,用于导出大量BigQuery数据并将查询结果作为CSV文件存储在本地 . 我使用Python 3和谷歌提供的客户端 . 我做了配置和验证,但问题是,我无法在本地存储数据 . 每次我执行,我得到以下 error message

googleapiclient.errors.HttpError:https://www.googleapis.com/bigquery/v2/projects/round-office-769/jobs ?alt = json返回“无效的提取目标URI”响应/文件名 - * .csv ' . 必须是有效的Google存储路径 . “>

This is my Job Configuration:

def export_table(service, cloud_storage_path,
             projectId, datasetId, tableId, sqlQuery,
             export_format="CSV",
             num_retries=5):

# Generate a unique job_id so retries
# don't accidentally duplicate export
job_data = {
    'jobReference': {
        'projectId': projectId,
        'jobId': str(uuid.uuid4())
    },
    'configuration': {
        'extract': {
            'sourceTable': {
                'projectId': projectId,
                'datasetId': datasetId,
                'tableId': tableId,
            },
            'destinationUris': ['response/file-name-*.csv'],
            'destinationFormat': export_format
        },
        'query': {
            'query': sqlQuery,
        }
    }
}
return service.jobs().insert(
    projectId=projectId,
    body=job_data).execute(num_retries=num_retries)

我希望我可以使用本地路径而不是 Cloud 存储来存储数据,但我错了 .

So my Question is:

我可以在本地(或本地数据库)下载查询数据,还是必须使用Google Cloud 端存储?

5 回答

  • 6

    您需要将Google Cloud 端存储用于导出作业 . 从BigQuery导出数据解释here,同时检查不同路径语法的变体 .

    然后,您可以将文件从GCS下载到本地存储 .

    Gsutil工具可以帮助您进一步将文件从GCS下载到本地计算机 .

    您无法在本地单步下载,首先需要导出到GCS,而不是转移到本地计算机 .

  • 5

    您可以使用分页机制直接下载所有数据(无需通过Google Cloud 端存储路由) . 基本上,您需要为每个页面生成页面标记,下载页面中的数据并重复此操作,直到所有数据都已下载,即不再有可用的标记 . 这是Java中的示例代码,希望澄清这个想法:

    import com.google.api.client.googleapis.auth.oauth2.GoogleCredential;
    import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
    import com.google.api.client.http.HttpTransport;
    import com.google.api.client.json.JsonFactory;
    import com.google.api.client.json.JsonFactory;
    import com.google.api.client.json.jackson2.JacksonFactory;
    import com.google.api.services.bigquery.Bigquery;
    import com.google.api.services.bigquery.BigqueryScopes;
    import com.google.api.client.util.Data;
    import com.google.api.services.bigquery.model.*;
    
    /* your class starts here */
    
    private String projectId = ""; /* fill in the project id here */
    private String query = ""; /* enter your query here */
    private Bigquery bigQuery;
    private Job insert;
    private TableDataList tableDataList;
    private Iterator<TableRow> rowsIterator;
    private List<TableRow> rows;
    private long maxResults = 100000L; /* max number of rows in a page */
    
    /* run query */
    public void open() throws Exception {
        HttpTransport transport = GoogleNetHttpTransport.newTrustedTransport();
        JsonFactory jsonFactory = new JacksonFactory();
        GoogleCredential credential = GoogleCredential.getApplicationDefault(transport, jsonFactory);
        if (credential.createScopedRequired())
            credential = credential.createScoped(BigqueryScopes.all());
        bigQuery = new Bigquery.Builder(transport, jsonFactory, credential).setApplicationName("my app").build();
    
        JobConfigurationQuery queryConfig = new JobConfigurationQuery().setQuery(query);
        JobConfiguration jobConfig = new JobConfiguration().setQuery(queryConfig);
        Job job = new Job().setConfiguration(jobConfig);
        insert = bigQuery.jobs().insert(projectId, job).execute();
        JobReference jobReference = insert.getJobReference();
    
        while (true) {
            Job poll = bigQuery.jobs().get(projectId, jobReference.getJobId()).execute();
            String state = poll.getStatus().getState();
            if ("DONE".equals(state)) {
                ErrorProto errorResult = poll.getStatus().getErrorResult();
                if (errorResult != null)
                    throw new Exception("Error running job: " + poll.getStatus().getErrors().get(0));
                break;
            }
            Thread.sleep(10000);
        }
    
        tableDataList = getPage();
        rows = tableDataList.getRows();
        rowsIterator = rows != null ? rows.iterator() : null;
    }
    
    /* read data row by row */
    public /* your data object here */ read() throws Exception {
        if (rowsIterator == null) return null;
    
        if (!rowsIterator.hasNext()) {
            String pageToken = tableDataList.getPageToken();
            if (pageToken == null) return null;
            tableDataList = getPage(pageToken);
            rows = tableDataList.getRows();
            if (rows == null) return null;
            rowsIterator = rows.iterator();
        }
    
        TableRow row = rowsIterator.next();
        for (TableCell cell : row.getF()) {
            Object value = cell.getV();
            /* extract the data here */
        }
    
        /* return the data */
    }
    
    private TableDataList getPage() throws IOException {
        return getPage(null);
    }
    
    private TableDataList getPage(String pageToken) throws IOException {
        TableReference sourceTable = insert
                .getConfiguration()
                .getQuery()
                .getDestinationTable();
        if (sourceTable == null)
            throw new IllegalArgumentException("Source table not available. Please check the query syntax.");
        return bigQuery.tabledata()
                .list(projectId, sourceTable.getDatasetId(), sourceTable.getTableId())
                .setPageToken(pageToken)
                .setMaxResults(maxResults)
                .execute();
    }
    
  • 2

    您可以在该表上运行tabledata.list()操作并设置“alt = csv”,它将以CSV格式返回表格的开头 .

  • 0

    另一种方法是从UI,一旦查询结果返回,您可以选择"Download as CSV"按钮 .
    enter image description here

  • 0

    如果您安装了Google BigQuery API和pandas以及pandas.io,您可以在Jupyter笔记本中运行Python,查询BQ表,并将数据导入本地数据帧 . 从那里,您可以将其写入CSV .

相关问题