首页 文章

AWS Glue to Redshift:是否可以替换,更新或删除数据?

提问于
浏览
15

以下是关于我如何设置的一些要点:

  • 我将CSV文件上传到S3,并使用Glue爬虫设置来创建表和架构 .

  • 我有一个Glue作业设置,它使用JDBC连接将数据从Glue表写入我们的Amazon Redshift数据库 . Job还负责映射列并创建redshift表 .

通过重新运行作业,我在redshift中获得了重复的行(正如预期的那样) . 但是,有没有办法在插入新数据之前替换或删除行,使用密钥或胶水中的分区设置?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()

5 回答

  • 1

    今天我已经测试并获得了使用JDBC连接从目标表更新/删除的解决方法 .

    我用过如下

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext
    from awsglue.job import Job
    
    import pg8000
    args = getResolvedOptions(sys.argv, [
        'JOB_NAME',
        'PW',
        'HOST',
        'USER',
        'DB'
    ])
    # ...
    # Create Spark & Glue context
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    # ...
    config_port = ****
    conn = pg8000.connect(
        database=args['DB'], 
        user=args['USER'], 
        password=args['PW'],
        host=args['HOST'],
        port=config_port
    )
    query = "UPDATE table .....;"
    
    cur = conn.cursor()
    cur.execute(query)
    conn.commit()
    cur.close()
    
    
    
    query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"
    
    cur1 = conn.cursor()
    cur1.execute(query1)
    conn.commit()
    cur1.close()
    conn.close()
    
  • 0

    Job bookmarks 是关键 . 只需编辑作业并启用"Job bookmarks",它就不会处理已处理的数据 . 请注意,作业必须重新运行一次才会检测到它不必再次重新处理旧数据 .

    有关详细信息,请参阅:http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

    在我看来,“书签”这个名字有点牵强 . 如果我在搜索期间没有巧合地偶然发现它,我将永远不会看到它 .

  • 8

    这是我从AWS Glue Support获得的解决方案:

    您可能知道,虽然您可以创建主键,但Redshift不会强制实现唯一性 . 因此,如果要重新运行Glue作业,则可以插入重复行 . 保持唯一性的一些方法是:

    • 使用临时表插入所有行,然后在主表中执行upsert / merge [1],这必须在glue之外完成 .

    • 在redshift表[1]中添加另一列,如插入时间戳,以允许重复,但要知道哪一个是第一个或最后一个,然后在需要时删除副本 .

    • 将先前插入的数据加载到数据帧中,然后比较要插入的数据以避免插入重复项[3]

    [1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.htmlhttp://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

    [2] - https://github.com/databricks/spark-redshift/issues/238

    [3] - https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

  • 0

    如上所述,Glue中的作业书签选项应该可以解决问题 . 当我的源是S3时,我一直在成功使用它 . http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

  • 4

    根据我的测试(使用相同的方案),BOOKMARK功能无效 . 多次运行作业时,将插入重复数据 . 我通过每天(通过lambda)从S3位置删除文件并实现Staging&Target表来解决此问题 . 数据将根据匹配的键列进行插入/更新 .

相关问题