Objective
我有一个充满json文件的S3文件夹,其中包含不同的模式,包括数组(一个dynamodb备份,因为它发生) . 但是,虽然模式各不相同,但所有文件都包含一些常用元素,例如“id”或“name”,以及不同长度的嵌套数组,例如“selected items” . 我希望能够在闲暇时解析这些元素 .
我有一个使用外部ETL工具(KNIME)的工作方法,我希望通过Glue以无服务器方式进行复制 .
Background
工作方法是:
-
使用Spectrum将所有S3数据作为外部表加载,每个json记录作为单个
varchar(65535)
条目 . -
使用Redshift SQL函数(如
json_extract_path_text
和json_array_length
)解析我需要的元素,包括特定数组 -
通过cross joining against a reference table数组索引规范化所需的json数组
-
执行必需的表连接并写出Redshift以供Tableau使用
现在,这似乎是一个适合Glue的任务 . 也就是说,我想做的是:
-
使用Spectrum加载所有数据,如上所述
-
在Glue中,从Spectrum表创建动态帧
-
使用
pyspark.sql.functions.explode()
之类的函数或者使用Glue的Relationalize变换从上面的动态帧中解析数据
要么:
-
将所有标量数据爬网到单个Glue模式中(假设Glue尚不支持JSON数组)
-
使用上述方法之一解析JSON并爆炸数组
Results so far
不幸的是,我无法让这些方法中的任何一种工作 . 对于各种方法,阻滞剂已经:
-
使用Glue抓取json数据 - 根据this post,Glue的解析器启发式方法决定源的各种模式与单个源的关系太不同,因此将它们解析为一堆不同的表 . 只需抓取每个文件以生成一个包含varchar(65535)类型的单一列的表,每行包含一个json条目,但似乎没有任何Glue分类器JSON路径表达式就足够了 . 实现这一点 .
-
'单varchar(65535)列'方法可以通过在Spectrum中将数据作为外部表加载来实现,但似乎Spectrum表不能作为动态帧加载到Glue中(注意胶水中存在相关表)目录显示为具有预期的varchar(65535)模式 . 在Zeppelin笔记本中工作,我发现了
newFrame = glueContext.create_dynamic_frame.from_catalog(database="<spectrum database>", table_name="<spectrum one column varchar table>")
运行成功,但会产生一个带有 newFrame.count() = 0
和 newFrame.toDF().show(n)
的表,因为任何n值都会产生一个奇怪的输出形式:
++
||
++
++
简而言之,似乎pySpark无法通过Glue直接使用Spectrum表 .
- 使用Crawler抓取Spectrum表格 . 在这里,我通过Glue连接到我的Redshift集群,在所需的表上指向了一个爬虫 . 但是,这会导致S3 endpoints 验证失败,我还没有解决 . 我不愿意深入研究VPC配置,因为我已经非常不确定将Crawler指向Spectrum表将是正确的方法 .
简而言之,我发现无法使用Glue Crawler或Glue和Redshift Spectrum的组合动态摄取和解析S3上的非标量json . 这可能不是一个深奥的任务 - 事实上,任何想要采用相对自动化的方法来报告基于Dynamodb的web应用程序的数据的人都需要实现这一目标 .
Question
所以我的问题,在一个声明中:有没有办法解决S3上的非标量json文件,使用不一致的模式,使用Glue(加上可能是另一个AWS服务,如RS Spectrum)?
1 回答
所以我假设在后台进行了一些事情 .
我假设您在Redshift Spectrum中定义了一个指向S3的外部表?如果是这样那么这不是最好的方法 . 而是定义指向Glue数据目录中的表的外部模式 . 结果是RS Spectrum将查看该Glue数据库中的所有内容,而您无需定义单独的表 .
其次,您是否尝试在Glue数据目录中手动定义表?我用Parquet文件格式进行了大量测试,弄乱了表定义和文件内容 . 结果是,无论来自该表的任何查询,它都只返回表中定义的数据 . 因此,您可以定义具有“id”,“name”和“selected items”字段的表,其他所有内容都将被忽略 .
如果由于任何原因,前一个似乎不起作用,那么胶水工作将有所帮助 . 在这里注意 - 总是使用只是spark,永远不要使用任何与glue_context相关的东西 . 无论如何,在spark.read.blaah中,您可以指定架构参数 . 用它 .
dataSchema = StructType([StructField("id",StringType(),True) ,StructField("name",StringType(),True) ,StructField("selected items",ArrayType(.. etc ...),True) ])
这样你就可以获得相同的结果 - 它只会解析你想要的数据 .
最终的设置应该是这样的:
S3输入桶有JSON档
胶水作业从输入桶读取数据并写入不同的桶,可能是分区的和/或采用不同的数据格式
胶水表在第二个桶/前缀的顶部定义
Redshift Spectrum指向包含上一步中定义的表的数据库