首页 文章

如何使用pubsub通知进行 Cloud 存储以触发数据流管道

提问于
浏览
1

我正在尝试将Google Cloud Dataflow管道与Google Cloud Pub/Sub Notifications for Google Cloud Storage集成 . 这个想法是在创建文件后立即开始处理文件 . 消息正在发布,并且我使用 PubsubIO.readMessagesWithAttributes() source来提取文件URI:

Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
            .withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
            .fromSubscription(options.getPubsubSubscription()))
            .apply(MapElements
                    .into(TypeDescriptors.strings())
                    .via((PubsubMessage msg) -> {
                        String bucket = msg.getAttribute("bucketId");
                        String object = msg.getAttribute("objectId");
                        GcsPath uri = GcsPath.fromComponents(bucket, object);
                        return uri.toString();
                    }));

哪个 PTransform 可用于开始读取/处理 uris PCollection 中的每个文件?

2 回答

  • 2

    HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取 PCollection<String> 文件模式或文件名 . 它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它 .

  • 0

    使用Google Cloud Functions修改 Cloud 存储更改通知应该是一个不错的选择(尽管仍处于测试阶段) .

    使用 Cloud 功能,您可以使用一些Javascript代码启动Dataflow作业 . 这是一个非常好的blogpost,应该让你在路上 . 只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业 .

    如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件 . 不确定这种方法是否更可取 .

相关问题