val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"
val fields = datasetDf.toDF().columns.mkString(",")
val postActions =
s"""
DELETE FROM $destination USING $staging AS S
WHERE $destinationTable.id = S.id
AND $destinationTable.date = S.date;
INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
DROP TABLE IF EXISTS $staging
"""
// Write data to staging table in Redshift
glueContext.getJDBCSink(
catalogConnection = "redshift-glue-connections-test",
options = JsonOptions(Map(
"database" -> "conndb",
"dbtable" -> staging,
"overwrite" -> "true",
"postactions" -> postActions
)),
redshiftTmpDir = s"$tempDir/redshift",
transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
3 回答
是的,它可以完全实现 . 您只需将pg8000模块导入胶水作业即可 . pg8000模块是python库,用于与Amazon Redshift Build 连接并通过游标执行SQL查询 . Python模块参考:https://github.com/mfenniak/pg8000然后,通过
pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd')
与目标集群 Build 连接并使用Glue,s datasink选项加载到staging表中,然后使用pg8000游标运行upsert sql query您需要压缩pg8000软件包并将其放在s3存储桶中,并将其引用到Glue Job部分的Advanced options / Job parameters下的Python Libraries路径 .
通过将'postactions'选项传递给JDBC sink,可以使用Glue中的staging表实现upsert到Redshift中:
确保用于写入Redshift的用户具有足够的权限来在分段模式中创建/删除表 .
AWS Glue支持Spark和Databricks库,因此您可以使用spark / Pyspark数据库来覆盖表:
Per Databricks / Spark文档:
您可以查看here中的数据库文档