首页 文章

在AWS-GLUE中转换json并在Amazon Redshift中上传

提问于
浏览
0

我出面阅读这篇关于展平json文件并在redshift中上传的亚马逊文章 .

https://aws.amazon.com/blogs/big-data/simplify-querying-nested-json-with-the-aws-glue-relationalize-transform/

我的计划是转换json文件并在s3中上传,然后将文件再次抓取到aws-glue到数据目录,并将数据作为表格上传到amazon redshift中 .

现在,“示例3:Python代码转换嵌套JSON并将其输出到ORC”中的代码问题显示了一些错误:

NameError:未定义名称'spark'

不是我迷失了因为我是aws-glue的新手,我需要在redshift中上传json(它们是嵌套数组) .

这是我的代码:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
#from awsglue.transforms import Relationalize



# Begin variables to customize with your information
glue_source_database = "DATABASE"
glue_source_table = "TABLE_NAME"
glue_temp_storage = "s3://XXXXX"
glue_relationalize_output_s3_path = "s3://XXXXX"
dfc_root_table_name = "root" #default value is "roottable"
# End variables to customize with your information

glueContext = GlueContext(spark.sparkContext)
datasource0 = glueContext.create_dynamic_frame.from_catalog(database = glue_source_database, table_name = glue_source_table, transformation_ctx = "datasource0")
dfc = Relationalize.apply(frame = datasource0, staging_path = glue_temp_storage, name = dfc_root_table_name, transformation_ctx = "dfc")
blogdata = dfc.select(dfc_root_table_name)
blogdataoutput = glueContext.write_dynamic_frame.from_options(frame = blogdata, connection_type = "s3", connection_options = {"path": glue_relationalize_output_s3_path}, format = "orc", transformation_ctx = "blogdataoutput")

2 回答

  • 0

    你错误地创造了 GlueContext . 你的代码应该是这样的

    from pyspark.context import SparkContext
    
    glueContext = GlueContext(SparkContext.getOrCreate())
    

    你可以看一下Glue code examples from AWS .

  • 0

    @beni

    我和你一样跟着指南有同样的问题,正确的火花环境会导致另一个问题就是编写 glueContext.write_dynamic_frame.from_options .

    检查日志,我看到 null value error . 所以添加 DropNullFields.apply 解决了这个问题

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    ## @params: [JOB_NAME]
    args = getResolvedOptions(sys.argv, ['JOB_NAME'])
    
    # Begin variables to customize with your information
    glue_source_database = "database_name"
    glue_source_table = "table_name"
    glue_temp_storage = "s3://bucket/tmp"
    glue_relationalize_output_s3_path = "s3://bucket/output"
    dfc_root_table_name = "root"  # default value is "roottable"
    # End variables to customize with your information
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = GlueContext(sc).spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database=glue_source_database, table_name=glue_source_table,
                                                                transformation_ctx="datasource0")
    dfc = Relationalize.apply(frame=datasource0, staging_path=glue_temp_storage, name=dfc_root_table_name,
                              transformation_ctx="dfc")
    fb_data = dfc.select(dfc_root_table_name)
    dropnullfields3 = DropNullFields.apply(frame=fb_data, transformation_ctx="dropnullfields3")
    fb_dataoutput = glueContext.write_dynamic_frame.from_options(frame=dropnullfields3, connection_type="s3",
                                                                 connection_options={
                                                                     "path": glue_relationalize_output_s3_path},
                                                                 format="orc", transformation_ctx="fb_dataoutput")
    
    job.commit()
    

    希望这有帮助!

相关问题