在AWS Glue的目录中,我有一个外部表定义了分区,在S3中看起来大致如此,并且每天添加新日期的分区:
s3://my-data-lake/test-table/
2017/01/01/
part-0000-blah.csv.gz
.
.
part-8000-blah.csv.gz
2017/01/02/
part-0000-blah.csv.gz
.
.
part-7666-blah.csv.gz
我怎么能用Glue / Spark把它转换成镶木地板,它也是按日期划分的,每天分成n个文件?这些示例不包括分区或拆分或配置(有多少节点和多大) . 每天包含几百GB .
因为源CSV不一定在正确的分区(错误的日期)并且大小不一致,所以我希望用正确的分区和更一致的大小写入分区镶木地板 .
2 回答
由于源CSV文件不一定在正确的日期,您可以向其添加有关收集日期时间的其他信息(或使用任何已有的日期):
然后,您的作业可以在输出DynamicFrame中使用此信息,并最终将它们用作分区 . 一些示例代码如何实现此目的:
没有测试它,但脚本只是如何实现这一点的一个示例,并且相当简单 .
UPDATE
1) As for having specific file sizes/numbers in output partitions ,
Spark的coalesce and repartition功能尚未在Glue的Python API中实现(仅在Scala中) .
您可以将动态帧转换为数据帧并利用Spark的partition capabilities .
好消息是Glue有一个有趣的功能,如果每个分区有超过50,000个输入文件,它会自动将它们分组给你 .
如果您想要专门设置此行为而不管输入文件编号( your case ),您可以设置以下
connection_options
而"creating a dynamic frame from options":在前面的示例中,它将尝试将文件分组为1MB组 .
值得一提的是,这与coalesce不同,但如果您的目标是减少每个分区的文件数量,它可能会有所帮助 .
2) If files already exist in the destination, will it just safely add it (not overwrite or delete)
对于
write_dynamic_frame.from_options
,胶水的默认SaveMode是 append .3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs
我担心我无法回答这个问题 . 这取决于它加载输入文件的速度(大小/数量),脚本的转换等 .
导入日期时间库
根据分区条件拆分时间戳
month= str(now.month)
day= str(now.day)
在writer类的路径地址中添加变量currdate . 结果将是镶木地板文件 .