首页 文章

如何使用AWS Glue / Spark将在S3中分区和拆分的CSV转换为分区和拆分Parquet

提问于
浏览
2

在AWS Glue的目录中,我有一个外部表定义了分区,在S3中看起来大致如此,并且每天添加新日期的分区:

s3://my-data-lake/test-table/
    2017/01/01/
        part-0000-blah.csv.gz
        .
        .
        part-8000-blah.csv.gz
    2017/01/02/
        part-0000-blah.csv.gz
        .
        .
        part-7666-blah.csv.gz

我怎么能用Glue / Spark把它转换成镶木地板,它也是按日期划分的,每天分成n个文件?这些示例不包括分区或拆分或配置(有多少节点和多大) . 每天包含几百GB .

因为源CSV不一定在正确的分区(错误的日期)并且大小不一致,所以我希望用正确的分区和更一致的大小写入分区镶木地板 .

2 回答

  • 6

    由于源CSV文件不一定在正确的日期,您可以向其添加有关收集日期时间的其他信息(或使用任何已有的日期):

    {"collectDateTime": {
        "timestamp": 1518091828,
        "timestampMs": 1518091828116,
        "day": 8,
        "month": 2,
        "year": 2018
    }}
    

    然后,您的作业可以在输出DynamicFrame中使用此信息,并最终将它们用作分区 . 一些示例代码如何实现此目的:

    from awsglue.transforms import *
    from pyspark.sql.types import *
    from awsglue.context import GlueContext
    from awsglue.utils import getResolvedOptions
    
    import sys
    import datetime
    
    ###
    # CREATE THE NEW SIMPLIFIED LINE
    ##
    def create_simplified_line(event_dict):
    
        # collect date time
        collect_date_time_dict = event_dict["collectDateTime"]
    
        new_line = {
            # TODO: COPY YOUR DATA HERE
            "myData": event_dict["myData"],
            "someOtherData": event_dict["someOtherData"],
            "timestamp": collect_date_time_dict["timestamp"],
            "timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
            "year": collect_date_time_dict["year"],
            "month": collect_date_time_dict["month"],
            "day": collect_date_time_dict["day"]
        }
    
        return new_line
    
    
    ###
    # MAIN FUNCTION
    ##
    
    # context
    glueContext = GlueContext(SparkContext.getOrCreate())
    
    # fetch from previous day source bucket
    previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)
    
    # build s3 paths
    s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)
    
    # create dynamic_frame
    dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")
    
    # resolve choices (optional)
    dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")
    
    # transform the source dynamic frame into a simplified version
    result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)
    
    # write to simple storage service in parquet format
    glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")
    

    没有测试它,但脚本只是如何实现这一点的一个示例,并且相当简单 .

    UPDATE

    1) As for having specific file sizes/numbers in output partitions

    Spark的coalesce and repartition功能尚未在Glue的Python API中实现(仅在Scala中) .

    您可以将动态帧转换为数据帧并利用Spark的partition capabilities .

    转换为基于“partition_col”的数据帧和分区partitioned_dataframe = datasource0.toDF() . repartition(1)转换回DynamicFrame进行进一步处理 . partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe,glueContext,“partitioned_df”)

    好消息是Glue有一个有趣的功能,如果每个分区有超过50,000个输入文件,它会自动将它们分组给你 .

    如果您想要专门设置此行为而不管输入文件编号( your case ),您可以设置以下 connection_options 而"creating a dynamic frame from options":

    dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")
    

    在前面的示例中,它将尝试将文件分组为1MB组 .

    值得一提的是,这与coalesce不同,但如果您的目标是减少每个分区的文件数量,它可能会有所帮助 .

    2) If files already exist in the destination, will it just safely add it (not overwrite or delete)

    对于 write_dynamic_frame.from_options ,胶水的默认SaveModeappend .

    将DataFrame保存到数据源时,如果数据/表已存在,则DataFrame的内容应附加到现有数据 .

    3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs

    我担心我无法回答这个问题 . 这取决于它加载输入文件的速度(大小/数量),脚本的转换等 .

  • 0

    导入日期时间库

    import datetime
    

    根据分区条件拆分时间戳

    now=datetime.datetime.now()
    year= str(now.year)
    

    month= str(now.month) day= str(now.day)

    currdate= "s3:/Destination/"+year+"/"+month+"/"+day
    

    在writer类的路径地址中添加变量currdate . 结果将是镶木地板文件 .

相关问题