我正在尝试将Google Cloud Dataflow管道与Google Cloud Pub/Sub Notifications for Google Cloud Storage集成 . 这个想法是在创建文件后立即开始处理文件 . 消息正在发布,并且我使用 PubsubIO.readMessagesWithAttributes()
source来提取文件URI:
Pipeline p = Pipeline.create(options);
PCollection<String> uris = p.apply(PubsubIO.readMessagesWithAttributes()
.withTimestampAttribute(PUBSUB_TIMESTAMP_LABEL_KEY)
.fromSubscription(options.getPubsubSubscription()))
.apply(MapElements
.into(TypeDescriptors.strings())
.via((PubsubMessage msg) -> {
String bucket = msg.getAttribute("bucketId");
String object = msg.getAttribute("objectId");
GcsPath uri = GcsPath.fromComponents(bucket, object);
return uri.toString();
}));
哪个 PTransform
可用于开始读取/处理 uris PCollection
中的每个文件?
2 回答
HEAD中的Apache Beam包含一个完全符合您要求的PTransform:TextIO.readAll()读取
PCollection<String>
文件模式或文件名 . 它将在Beam 2.2.0中提供,但是现在你可以从github repo自己构建一个Beam快照并依赖于它 .使用Google Cloud Functions修改 Cloud 存储更改通知应该是一个不错的选择(尽管仍处于测试阶段) .
使用 Cloud 功能,您可以使用一些Javascript代码启动Dataflow作业 . 这是一个非常好的blogpost,应该让你在路上 . 只要新文件落入存储桶或文件发生更改并处理这些文件,您就会启动数据流作业 .
如果您想坚持自己的方法,可能需要使用Google Cloud Storage Java SDK来读取自定义DoFn中的文件 . 不确定这种方法是否更可取 .