首页 文章

从AWS Glue升级到Amazon Redshift

提问于
浏览
0

据我所知,没有直接的UPSERT查询可以直接从Glue到Redshift执行 . 是否可以在胶水脚本本身中实现临时表概念?

所以我的期望是创建临时表,将其与目标表合并,最后删除它 . 可以在Glue脚本中实现吗?

3 回答

  • 0

    是的,它可以完全实现 . 您只需将pg8000模块导入胶水作业即可 . pg8000模块是python库,用于与Amazon Redshift Build 连接并通过游标执行SQL查询 . Python模块参考:https://github.com/mfenniak/pg8000然后,通过 pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd') 与目标集群 Build 连接并使用Glue,s datasink选项加载到staging表中,然后使用pg8000游标运行upsert sql query

    >>> import pg8000
    >>> conn = pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd')
    >>> cursor = conn.cursor()
    >>> cursor.execute("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
    >>> cursor.execute("INSERT INTO TABLE final_target"))
    >>> conn.commit()
    

    您需要压缩pg8000软件包并将其放在s3存储桶中,并将其引用到Glue Job部分的Advanced options / Job parameters下的Python Libraries路径 .

  • 0

    通过将'postactions'选项传递给JDBC sink,可以使用Glue中的staging表实现upsert到Redshift中:

    val destinationTable = "upsert_test"
    val destination = s"dev_sandbox.${destinationTable}"
    val staging = s"dev_sandbox.${destinationTable}_staging"
    
    val fields = datasetDf.toDF().columns.mkString(",")
    
    val postActions =
      s"""
         DELETE FROM $destination USING $staging AS S
            WHERE $destinationTable.id = S.id
              AND $destinationTable.date = S.date;
         INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
         DROP TABLE IF EXISTS $staging
      """
    
    // Write data to staging table in Redshift
    glueContext.getJDBCSink(
      catalogConnection = "redshift-glue-connections-test",
      options = JsonOptions(Map(
        "database" -> "conndb",
        "dbtable" -> staging,
        "overwrite" -> "true",
        "postactions" -> postActions
      )),
      redshiftTmpDir = s"$tempDir/redshift",
      transformationContext = "redshift-output"
    ).writeDynamicFrame(datasetDf)
    

    确保用于写入Redshift的用户具有足够的权限来在分段模式中创建/删除表 .

  • 1

    AWS Glue支持Spark和Databricks库,因此您可以使用spark / Pyspark数据库来覆盖表:

    df.write\
      .format("com.databricks.spark.redshift")\
      .option("url", redshift_url)\
      .option("dbtable", redshift_table)\
      .option("user", user)\
      .option("password", readshift_password)\
      .option("aws_iam_role", redshift_copy_role)\
      .option("tempdir", args["TempDir"])\
      .mode("overwrite")\
      .save()
    

    Per Databricks / Spark文档:

    覆盖现有表:默认情况下,此库使用事务来执行覆盖,这是通过删除目标表,创建新的空表并向其追加行来实现的 .

    您可以查看here中的数据库文档

相关问题