-
0 votesanswersviews
使用Beam将Avro写入BigQuery
Q1:假设我使用BigQuery加载工具加载Avro编码数据 . 现在我需要将这些数据写入仍然采用Avro格式的不同表格 . 我试图测试不同的分区,以测试表性能 . 如何使用Beam将SchemaAndRecord写回BigQuery?在这种情况下,模式检测也会工作吗? Q2:从Avro架构类型转换为BigQuery架构类型时,看起来架构信息会丢失 . 例如,在BigQuery中,double和f... -
0 votesanswersviews
Apache Beam字段分区
我想在特定字段中使用Python在Apache Beam中对PCollection进行分区 . 我在Python SDK文档中找到了以下代码,用于对PCollection进行分区 students = ... def partition_fn(student, num_partitions): return int(get_percentile(student) * num_partition... -
0 votesanswersviews
使用Python中的Dataflow Pipeline将数据从Google Datastore传输到Bigquery
我们遇到了从Google Cloud Datastore到Bigquery的数据传输问题 . 我们需要在python中为这个作业创建数据流脚本 . 这个作业应该通过在python中使用管道将数据从数据存储区传输到bigquery . 对于python中的这项工作,它需要“Apache Beam”库 . 但Apache Beam库不起作用 . 有人可以帮助我们吗? -
3 votesanswersviews
如何将表行PCollections转换为Python中的键值PCollections?
没有关于如何将pCollections转换为输入到.CoGroupByKey()所需的pCollections的文档 上下文基本上我有两个大的pCollections,我需要能够找到两者之间的差异,对于第二类ETL更改(如果它在pColl1中不存在,那么添加到pColl2中的嵌套字段),这样我就能够从BigQuery保留这些记录的历史记录 . 管道架构: 将BQ表读入2个pCollection... -
0 votesanswersviews
当timestamp列包含年份<1900时,无法从BigQuery读取数据
在使用最新的Apache Beam SDK for Python 2.2.0定义的管道上,运行读取和写入BigQuery表的简单管道时出现此错误 . 由于少数行具有年份<1900的时间戳,因此读取操作失败 . 如何修补此dataflow_worker包? apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException... -
2 votesanswersviews
在Beam管道中以编程方式生成BigQuery模式
我有一个同类dicts的集合,如何在不知道架构的情况下将它们写入BigQuery? BigQuerySink要求我在构造架构时指定架构 . 但是,我不知道架构:它是由我正在尝试编写的词组的键来定义的 . 有没有办法让我的管道推断出架构,然后将它(作为侧面输入?)提供给接收器? 例如: # Create a PCollection of dicts, something like # {'field... -
0 votesanswersviews
Apache Beam:将具有对象列表的对象转换为多个TableRows以写入BigQuery
我正在处理一个梁管道来处理一个json并将它写入bigquery . JSON就是这样的 . { "message": [{ "name": "abc", "itemId": "2123", "itemName": "test" }... -
1 votesanswersviews
从Dataflow流式传输时从BigQuery中删除数据
是否可以在从Apache Beam管道将数据加载到BigQuery表中时从BigQuery表中删除数据 . 我们的用例是这样的,我们需要根据时间戳字段(Dataflow从Pubsub主题中提取消息的时间)从表中删除3天的先前数据 . 建议做这样的事吗?如果是的话,有没有办法实现这个目标? 谢谢 . -
0 votesanswersviews
Apache Beam Dataflow BigQuery
如何使用带有DataflowRunner的apache beam从Google BigQuery数据集中获取表格列表? 我找不到如何从指定的数据集中获取表 . 我想使用Dataflow的并行处理编程模型将表格从位于美国的数据集迁移到EU中的表格 . -
2 votesanswersviews
Apache Beam with Dataflow - 从BigQuery读取时为Nullpointer
我正在使用从BigQuery表和文件中读取的apache beam编写的google数据流运行 . 转换数据并将其写入其他BigQuery表 . 作业“通常”成功,但有时我从大查询表中读取并且我的作业失败时随机获取nullpointer异常: (288abb7678892196): java.lang.NullPointerException at org.apache.beam.sdk.io.g... -
0 votesanswersviews
Google数据流 - Apache Beam StreamingOptions.streaming无效
我们创建了一个Apache Beam管道,在直接管道上运行时可以成功运行 . 见下面的问题: - BigQuery writeTableRows Always writing to buffer 将相同的代码推送到Google DataFlow后,它将更改为Streaming并且不会批量处理 . rows.apply("Load", BigQueryIO.writeTableR... -
0 votesanswersviews
使用连接从BigQuery读取批量数据
我有一个用例,我必须通过在20个不同的BQ表上应用左连接来读取BigQuery中的选定数据,对该数据应用转换,然后最终转储到最终的BQ表中 . 我有两种方法可以实现这一点,我尝试了样本数据(从20个表中共计10到12百万行)并获得如下结果: 方法1 - 为整个过程编写BQ兼容查询,并通过Apache Beam(使用BigQueryIO.readTableRows())触发相同的查询,最后将数据转... -
3 votesanswersviews
通过Apache Beam写入动态BigQuery表
我在运行时获取BigQuery表名称,并将该名称传递给管道末尾的BigQueryIO.write操作以写入该表 . 我为它编写的代码是: rows.apply("write to BigQuery", BigQueryIO .writeTableRows() .withSchema(schema) .to("projectID:DatasetI... -
1 votesanswersviews
如何从BigQuery读取流
我正在使用Java Apache Beam SDK for Java 2.0.1-SNAPSHOT Scenario: 从BigQuery(BQ)读取数据 - >数据流中的ETL过程 - >在BQ表中写入数据 问题是管道正在尝试在BQ中执行插入之前处理所有数据 . Is there a way to execute stream inserts in this case? 我已经尝... -
1 votesanswersviews
Apache Beam窗口化和分片BigQuery输出表
我的用例很简单:从Pub / Sub订阅中读取事件日志,解析它们并保存到BigQuery中 . 因为事件的数量预计会显着增长,并且我使用无界数据源,所以我决定在BigQuery中配置分片:根据事件数据的时间戳将事件存储到日常表中(在Beam文档中称为"event time") . 我的问题是我需要在我的情况下配置窗口,或者我可以保留隐式使用全局窗口的默认配置吗?我没有使用 Gr... -
0 votesanswersviews
Python Apache流束流到Bigquery的失败
我正在尝试将Apache光束输出写入Bigquery Table,但它的下载失败,如下所示 . 请帮助 . 代码: import logging import apache_beam as beam from mongodbio import ReadFromMongo TABLE_SCHEMA = ('clause_type:STRING, count:INTEGER') def trans... -
1 votesanswersviews
Apache Beam GroupByKey()在Python上使用Google DataFlow运行时失败
我有一个使用Python SDK 2.2.0 for Apache Beam的管道 . 这个管道几乎是一个典型的字数:我有 ("John Doe, Jane Smith", 1) 格式的名字对,我想弄清楚每对名字出现在一起的次数,如下所示: p_collection | "PairWithOne" >> beam.Map(... -
0 votesanswersviews
Apache Beam DynamicDestinations的文件结构写入BigQuery
我正在使用DynamicDestinations(来自BigQueryIO)将数据从一个Cassandra表导出到多个Google BigQuery表 . 该过程包括几个步骤,包括将准备好的数据写入Google Cloud 端存储(作为JSON格式的文件),然后通过加载作业将文件加载到BQ . 问题是导出过程已在最后一步因内存不足错误而结束(将文件从Google Storage加载到BQ) . 但... -
0 votesanswersviews
Apache beam和BigQuery
我正在尝试执行apache beam sdk 2.4和库com.google.cloud.bigquery但是你的异常 Exception in thread main java.lang.NoSuchMethodError com.google.api.client.googleapis.services.json.AbstractGoogleJsonClient$Builder.setBatc... -
0 votesanswersviews
Apache Beam中的DymanicDestinations
我有一个PCollection [String]说“X”我需要在BigQuery表中转储 . 表目的地和它的模式在PCollection [TableRow]中说“Y” . 如何以最简单的方式实现这一目标? 我尝试从“Y”中提取表和模式,并将其保存在静态全局变量(分别为tableName和schema)中 . 但奇怪的是,BigQueryIO.writeTableRows()总是将变量tableN... -
0 votesanswersviews
通过Apache Beam访问BigQuery表
我正在使用DataFlow v1.9使用以下代码检索BigQuery表的模式: Bigquery bigQueryClient=Transport.newBigQueryClient(options.as(BigQueryOptions.class)).build(); Tables tableRequest = bigQueryClient.tables(); Table table = tab... -
1 votesanswersviews
使用Apache Beam将结果列添加到现有BigQuery表
我想添加列,这是BigQuery中两个现有列的结果 . 我正在使用Apache Beam从BigQuery读取,然后处理它并将结果更新为与新列相同的BigQuery表 . -
0 votesanswersviews
Apache Beam无法读取Avro文件
我需要通过java读取本地或gcs的avro文件 . 我按照https://beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/apache/beam/sdk/io/AvroIO.html中的文档示例 Pipeline p = ...; // A Read from a GCS file (runs locally an... -
0 votesanswersviews
根据消息内容从Dataflow作业写入动态PubSub主题
我想根据字段的内容动态地将PCollection的不同元素路由到不同的PubSub主题 . 主题不是持久的,但是假设它们在运行时执行PubSubIO.Write()时存在 . 因此,Dataflow应该仅在每个消息的基础上在运行时推断它们的名称 . BigQuery和动态表名称存在此功能:https://beam.apache.org/documentation/sdks/javadoc/2.0.... -
1 votesanswersviews
Apache Beam DynamicAvroDestinations DefaultFilenamePolicy,使用String而不是ResourceId
根据https://beam.apache.org/documentation/sdks/javadoc/2.4.0/org/apache/beam/sdk/io/AvroIO.html上的写例 以下代码应该有效: public FilenamePolicy getFilenamePolicy(Integer userId) { return DefaultFilenamePolicy.f... -
0 votesanswersviews
来自KafkaIO的Apache Beam Stream - 窗口需求
我使用KafkaIO API从Kafka主题传输消息https://beam.apache.org/documentation/sdks/javadoc/2.0.0/org/apache/beam/sdk/io/kafka/KafkaIO.html 管道流程如下: KafkaStream - >使用变换器解码消息 - >保存到BigQuery 我解码消息并使用BigQueryIO保存... -
1 votesanswersviews
Wait.on(信号)在Apache Beam中使用
写入1st之后是否可以使用Wait.on()方法(Apache Beam 2.4中的新功能)在批处理管道中写入第二个BigQuery表? Apache Beam文档中给出的示例是: PCollection<Void> firstWriteResults = data.apply(ParDo.of(...write to first database...)); data.apply(... -
1 votesanswersviews
Apache Beam - 你如何从无限制的PCollection中进行限制?
出于调试目的,我希望能够将无界PCollection转换为有界PCollection . 有一种直截了当的方式吗?在我看来,除此之外,这样做是为了强制管道完成,这将是有用的 . 我认为Sample.any()(这里是Javadoc:https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transfo... -
0 votesanswersviews
Apache beam,使用java 8 lambdas处理错误/侧输出
任何人都可以使用 Java8 Lambdas 提供如何使用 Java8 Lambdas 处理无效输入的例子,我们可以在这篇优秀的博客文章_1438919中找到 . 或者我只是使用 ParDo 和 DoFn PS我正在使用 beam 2.1.0 干杯, -
1 votesanswersviews
Apache Beam - > BigQuery - 用于重复数据删除的insertId不起作用
我使用带有google dataflow runner的apache beam将数据从kafka传输到BigQuery . 我想利用insertId进行重复数据删除,我发现谷歌文档中描述了这一点 . 但是即使插件在几秒钟之内发生,我仍然会看到很多行具有相同的insertId . 现在我想知道也许我没有正确使用API来利用BQ提供的流媒体插入的重复数据删除机制 . 我在写入梁的代码如下: payme...