首页 文章

AWS Glue:如何使用不同的模式ETL非标量JSON

提问于
浏览
0

Objective

我有一个充满json文件的S3文件夹,其中包含不同的模式,包括数组(一个dynamodb备份,因为它发生) . 但是,虽然模式各不相同,但所有文件都包含一些常用元素,例如“id”或“name”,以及不同长度的嵌套数组,例如“selected items” . 我希望能够在闲暇时解析这些元素 .

我有一个使用外部ETL工具(KNIME)的工作方法,我希望通过Glue以无服务器方式进行复制 .

Background

工作方法是:

  • 使用Spectrum将所有S3数据作为外部表加载,每个json记录作为单个 varchar(65535) 条目 .

  • 使用Redshift SQL函数(如 json_extract_path_textjson_array_length )解析我需要的元素,包括特定数组

  • 通过cross joining against a reference table数组索引规范化所需的json数组

  • 执行必需的表连接并写出Redshift以供Tableau使用

现在,这似乎是一个适合Glue的任务 . 也就是说,我想做的是:

  • 使用Spectrum加载所有数据,如上所述

  • 在Glue中,从Spectrum表创建动态帧

  • 使用 pyspark.sql.functions.explode() 之类的函数或者使用Glue的Relationalize变换从上面的动态帧中解析数据

要么:

  • 将所有标量数据爬网到单个Glue模式中(假设Glue尚不支持JSON数组)

  • 使用上述方法之一解析JSON并爆炸数组

Results so far

不幸的是,我无法让这些方法中的任何一种工作 . 对于各种方法,阻滞剂已经:

  • 使用Glue抓取json数据 - 根据this post,Glue的解析器启发式方法决定源的各种模式与单个源的关系太不同,因此将它们解析为一堆不同的表 . 只需抓取每个文件以生成一个包含varchar(65535)类型的单一列的表,每行包含一个json条目,但似乎没有任何Glue分类器JSON路径表达式就足够了 . 实现这一点 .

  • '单varchar(65535)列'方法可以通过在Spectrum中将数据作为外部表加载来实现,但似乎Spectrum表不能作为动态帧加载到Glue中(注意胶水中存在相关表)目录显示为具有预期的varchar(65535)模式 . 在Zeppelin笔记本中工作,我发现了

newFrame = glueContext.create_dynamic_frame.from_catalog(database="<spectrum database>", table_name="<spectrum one column varchar table>")

运行成功,但会产生一个带有 newFrame.count() = 0newFrame.toDF().show(n) 的表,因为任何n值都会产生一个奇怪的输出形式:

++ || ++ ++

简而言之,似乎pySpark无法通过Glue直接使用Spectrum表 .

  • 使用Crawler抓取Spectrum表格 . 在这里,我通过Glue连接到我的Redshift集群,在所需的表上指向了一个爬虫 . 但是,这会导致S3 endpoints 验证失败,我还没有解决 . 我不愿意深入研究VPC配置,因为我已经非常不确定将Crawler指向Spectrum表将是正确的方法 .

简而言之,我发现无法使用Glue Crawler或Glue和Redshift Spectrum的组合动态摄取和解析S3上的非标量json . 这可能不是一个深奥的任务 - 事实上,任何想要采用相对自动化的方法来报告基于Dynamodb的web应用程序的数据的人都需要实现这一目标 .

Question

所以我的问题,在一个声明中:有没有办法解决S3上的非标量json文件,使用不一致的模式,使用Glue(加上可能是另一个AWS服务,如RS Spectrum)?

1 回答

  • 1

    所以我假设在后台进行了一些事情 .

    我假设您在Redshift Spectrum中定义了一个指向S3的外部表?如果是这样那么这不是最好的方法 . 而是定义指向Glue数据目录中的表的外部模式 . 结果是RS Spectrum将查看该Glue数据库中的所有内容,而您无需定义单独的表 .

    其次,您是否尝试在Glue数据目录中手动定义表?我用Parquet文件格式进行了大量测试,弄乱了表定义和文件内容 . 结果是,无论来自该表的任何查询,它都只返回表中定义的数据 . 因此,您可以定义具有“id”,“name”和“selected items”字段的表,其他所有内容都将被忽略 .

    如果由于任何原因,前一个似乎不起作用,那么胶水工作将有所帮助 . 在这里注意 - 总是使用只是spark,永远不要使用任何与glue_context相关的东西 . 无论如何,在spark.read.blaah中,您可以指定架构参数 . 用它 .

    dataSchema = StructType([StructField("id",StringType(),True) ,StructField("name",StringType(),True) ,StructField("selected items",ArrayType(.. etc ...),True) ])

    这样你就可以获得相同的结果 - 它只会解析你想要的数据 .

    最终的设置应该是这样的:

    • S3输入桶有JSON档

    • 胶水作业从输入桶读取数据并写入不同的桶,可能是分区的和/或采用不同的数据格式

    • 胶水表在第二个桶/前缀的顶部定义

    • Redshift Spectrum指向包含上一步中定义的表的数据库

相关问题