首页 文章

AWS胶水和重复数据删除增量CSV文件

提问于
浏览
1

我每天都会将csv文件传送到S3,这些文件是本月的增量文件 . 因此file1包含第1天的数据,file2包含第1天和第2天的数据等等 . 每天我都想对该数据运行ETL并将其写入不同的S3位置,以便我可以使用Athena进行查询没有重复的行存在 . 基本上我只想查询聚合数据的最新状态(这将是最近交付的文件到S3的内容) .

我不认为书签会起作用,因为增量交付包含以前文件中的数据,因此会产生重复 . 我知道如果我在源存储桶中的所有文件上运行,我可以转换为数据框并删除这样的重复项:

spark_df = resolvechoice2.toDF()
spark_df = spark_df.dropDuplicates()

但这似乎会为我添加大量处理,每次都对源表中的所有数据运行ETL .

基本的工作流程就像是,新的文件被交付,也许使用Lambda来启动AWS Glue Job,它只处理该文件的内容,然后替换输出桶的内容 . 输出桶按年和月分区 .

最简单的方法是在每次运行时只启用书签并删除输出桶中的所有内容吗?

2 回答

  • 0

    如果您的文件位于单独的文件夹中,您可以使用Athena的 EXTERNAL TABLE 并且每天指向当前分区(例如使用Lambda): - 删除指向昨天's folder - add partition pointing to today' s文件夹的分区 - 月份's end you leave the partition pointing to the last day (containing whole month'数据) .

    这样您就不需要任何重复数据删除过程,只需管理Athena分区即可 .

  • 0

    这个答案可能会对你有所帮助,它几乎就是你正在寻找的东西:Pyspark read delta/upsert dataset from csv files但它是在'incoming'方面,而不是它已经在数据库中 . 您还可以查看导入导入数据的时间戳:Adding timestamp column in importing data in redshift using AWS Glue Job

相关问题