首页 文章

更新具有复杂功能的pyspark数据帧列

提问于
浏览
0

是否可以使用UDF中不可行的复杂函数更新pyspark中的hiveContext数据帧列?

我有一个包含许多列的数据框,其中2列称为时间戳和数据 . 我需要从数据中的JSON字符串中检索时间戳,如果数据中的时间戳符合某些条件,则更新时间戳列 . 我知道数据帧是不可变的,但有可能以某种方式构建一个新的数据帧,保留旧数据帧的所有列,但更新timstamp列?

代码说明了我想做的事情:

def updateTime(row):
    import json

    THRESHOLD_TIME = 60 * 30
    client_timestamp = json.loads(row['data'])
    client_timestamp = float(client_timestamp['timestamp'])
    server_timestamp = float(row['timestamp'])
    if server_timestamp - client_timestamp <= THRESHOLD_TIME:
        new_row = .....  # copy contents of row
        new_row['timestamp'] = client_timestamp
        return new_row
    else:
        return row

df = df.map(updateTime)

我想过将行内容映射到一个元组,然后用.toDF()将其转换回数据帧,但我找不到将行内容复制到元组然后返回列名的方法 .

1 回答

  • 0

    如果您调整 updateTime 函数以接收Timestamp作为参数并返回新处理的时间戳,则可以创建UDF并直接在DataFrame列上使用它:

    from pyspark.sql.functions import *
    from pyspark.sql.types import TimestampType
    
    myUDF = udf(updateTime, TimestampType())
    df = df.withColumn("timestamp", myUDF(col("timestamp"))
    

    但是,在你的情况下,我认为它有点复杂:

    from pyspark.sql.functions import *
    from pyspark.sql.types import TimestampType
    
    myUDF = udf(getClientTime, TimestampType())
    client_timestamp = myUDF(col("data"))
    server_timestamp = col("timestamp")
    condition = server_timestamp.cast("float") - client_timestamp.cast("float") <= THRESHOLD_TIME    
    
    newCol =  when(condition, client_timestamp).otherwise(server_timestamp) 
    newDF = df.withColumn("new_timestamp", newCol)
    

    使用第二种方法,函数 getClientTimedata 列接收一个值,并返回该值的客户机时间戳 . 然后,您可以使用它来创建包含此信息的新列( client_timestamp ) . 最后,您可以使用when根据 server_timestamp 列和新创建的 client_timestamp 列的值有条件地创建新列 .

    参考:

相关问题