首页 文章

使用AWS Glue Job在redshift中导入数据时添加时间戳列

提问于
浏览
1

我想知道在AWS Glue Job加载时是否可以在表中添加时间戳列 .

第一种情景:

A列| B栏| TimeStamp A | 2 | 2018-06-03 23:59:00.0

当Crawler更新数据目录中的表并再次运行作业时,该表将使用新的时间戳在表中添加新数据 .

A列| B栏| TimeStamp A | 4 | 2018-06-04 05:01:31.0 B | 8 | 2018-06-04 06:02:31.0

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ColumnA", "char", "ColumnA", "char"), ("ColumnB", "char", "ColumnB", "char")], transformation_ctx = "applymapping1")
resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = dropnullfields3, catalog_connection = "TESTDB", connection_options = {"dbtable": "TABLEA", "database": "anasightprd01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")

2 回答

  • 2

    虽然可能有一种方法可以在胶水代码中获取当前日期时间,但另一种在插入时为数据添加时间戳的常用方法是将一个 TIMESTAMP 数据列添加到Redshift表中,该表绑定到默认值GETDATE()

    CREATE TABLE myschema.mytable
    (
        ... OTHER Fields here
        insertedtimestamp TIMESTAMP WITH TIME ZONE DEFAULT(GETDATE())
    );
    

    插入的技巧是确保 INSERT INTOCOPY 字段中未指定 insertedtimestamp 列 - 因为行已添加到表中

    INSERT INTO myschema.mytable(Col1, Col2 ...) -- NB no `insertedtimestamp` column
    VALUES ('col1', 'col2' ...);
    
    • insertedtimestamp 的值将自动加盖时间戳
  • 1

    将DynamicFrame转换为spark的DataFrame,添加一个包含当前时间戳的新列,然后在写入之前将其转换回DynamicFrame .

    import org.apache.spark.sql.functions._
    
    ...
    
    val timestampedDf = dropnullfields3.toDF().withColumn("TimeStamp", current_timestamp())
    val timestamped4 = DynamicFrame(timestampedDf, glueContext)
    

    这里你的Python代码应该是这样的:

    import sys
    from awsglue.transforms import *
    from awsglue.utils import getResolvedOptions
    from pyspark.context import SparkContext
    from awsglue.context import GlueContext, DynamicFrame
    from awsglue.job import Job
    from pyspark.sql.functions import current_timestamp
    
    ## @params: [TempDir, JOB_NAME]
    args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])
    
    sc = SparkContext()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)
    job.init(args['JOB_NAME'], args)
    
    datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sampledb", table_name = "abs", transformation_ctx = "datasource0")
    applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ColumnA", "char", "ColumnA", "char"), ("ColumnB", "char", "ColumnB", "char")], transformation_ctx = "applymapping1")
    resolvechoice2 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice2")
    dropnullfields3 = DropNullFields.apply(frame = resolvechoice2, transformation_ctx = "dropnullfields3")
    # add TimeStamp column
    timestampedDf = dropnullfields3.toDF().withColumn("TimeStamp", current_timestamp())
    timestamped4 = DynamicFrame.fromDF(timestampedDf, glueContext, "timestampedDf")
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = timestamped4, catalog_connection = "TESTDB", connection_options = {"dbtable": "TABLEA", "database": "anasightprd01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink4")
    

相关问题