首页 文章

AWS Glue:如何使用不同的模式处理嵌套的JSON

提问于
浏览
4

Objective: 我们希望使用AWS Glue数据目录为驻留在S3存储桶中的JSON数据创建单个表,然后我们将通过Redshift Spectrum进行查询和解析 .

Background: JSON数据来自DynamoDB Streams,并且是深层嵌套的 . 第一级JSON具有一组一致的元素:Keys,NewImage,OldImage,SequenceNumber,ApproximateCreationDateTime,SizeBytes和EventName . 唯一的变化是某些记录没有NewImage,而有些记录没有OldImage . 但是,在第一级以下,架构变化很大 .

理想情况下,我们希望使用Glue仅解析第一级JSON,并基本上将较低级别视为大型STRING对象(我们将根据需要使用Redshift Spectrum对其进行解析) . 目前,我们将整个记录加载到Redshift中的单个VARCHAR列中,但记录接近Redshift中数据类型的最大大小(最大VARCHAR长度为65535) . 因此,我们希望在记录达到Redshift之前执行第一级解析 .

What we've tried/referenced so far:

  • 将AWS Glue Crawler指向S3存储桶会导致数百个表具有一致的顶级模式(上面列出的属性),但STRUCT元素中更深层次的模式不同 . 我们还没有找到一种方法来创建一个Glue ETL Job,它将从所有这些表中读取并将其加载到一个表中 .

  • 手动创建表并不富有成效 . 我们尝试将每个列设置为STRING数据类型,但是作业没有成功加载数据(可能因为这会涉及从STRUCT到STRING的一些转换) . 将列设置为STRUCT时,它需要一个已定义的模式 - 但这正是从一个记录到另一个记录的不同,因此我们无法提供适用于所有相关记录的通用STRUCT模式 .

  • AWS胶水Relationalize transform很吸引人,但不是我们在这种情况下要寻找的东西(因为我们希望保留一些JSON完整,而不是完全展平它) . Redshift Spectrum支持几周前的scalar JSON数据,但这不适用于我们正在处理的嵌套JSON . 这些似乎都没有帮助处理由Glue Crawler创建的数百个表 .

Question: 我们如何使用Glue(或其他方法)来解析这些记录的第一级 - 同时忽略顶层元素下面的不同模式 - 这样我们就可以从Spectrum访问它或者物理加载它进入Redshift?

我是Glue的新手 . 我花了很多时间在Glue文档中并在论坛上查看(有点稀疏)信息 . 我可能会遗漏一些明显的东西 - 或者这可能是目前形式的胶水限制 . 欢迎任何建议 .

谢谢!

4 回答

  • 0

    我发现对浅层嵌套json有用的过程:

    • 第一级的ApplyMapping为 datasource0 ;

    • 爆炸 structarray 对象以摆脱元素级别 df1 = datasource0.toDF().select(id,col1,col2,...,explode(coln).alias(coln) ,其中 explode 需要 from pyspark.sql.functions import explode ;

    • 通过 intact_json = df1.select(id, itct1, itct2,..., itctm) 选择您希望保持原样的JSON对象;

    • df1 转换回dynamicFrame并对dynamicFrame进行Relationalize以及删除完整列 dataframe.drop_fields(itct1, itct2,..., itctm) ;

    • 使用基于'id'列的完整表加入关系表 .

  • 0

    这是目前胶水的限制 . 你看过胶水分类器了吗?这是我还没有使用的唯一一件,但可能适合您的需求 . 您可以为字段或类似的东西定义JSON路径 .

    除此之外 - Glue Jobs是要走的路 . 它是背景中的Spark,所以你几乎可以做任何事情 . 设置开发 endpoints 并使用它 . 在过去的三周里,我遇到了各种障碍,并决定完全放弃任何和所有Glue功能,只有Spark,这样既可移动又实际工作 .

    在设置开发 endpoints 时,您可能需要记住的一件事是IAM角色必须具有路径“/”,因此您很可能需要手动创建具有此路径的单独角色 . 自动创建的路径为“/ service-role /” .

  • 0

    我不确定您是否可以使用表定义执行此操作,但是您可以使用映射函数将顶级值强制转换为JSON字符串,从而通过ETL作业完成此操作 . 文件:[link]

    import json
    
    # Your mapping function
    def flatten(rec):
        for key in rec:
            rec[key] = json.dumps(rec[key])
        return rec
    
    old_df = glueContext.create_dynamic_frame.from_options(
        's3',
        {"paths": ['s3://...']},
        "json")
    
    # Apply mapping function f to all DynamicRecords in DynamicFrame
    new_df = Map.apply(frame=old_df, f=flatten)
    

    从这里你可以选择导出到S3(可能是Parquet或其他一些柱状格式以优化查询)或者直接从我的理解中直接进入Redshift,尽管我还没有尝试过 .

  • 0

    你应该添加胶水分类器,最好是$ [*]

    当您在s3中抓取json文件时,它将读取该文件的第一行 .

    您可以创建粘合作业,以便将此json文件的数据目录表加载到redshift中 .

    我唯一的问题是Redshift Spectrum存在问题读取数据目录中的json表..

    如果您找到了解决方案,请告诉我

相关问题