是否可以使用UDF中不可行的复杂函数更新pyspark中的hiveContext数据帧列?
我有一个包含许多列的数据框,其中2列称为时间戳和数据 . 我需要从数据中的JSON字符串中检索时间戳,如果数据中的时间戳符合某些条件,则更新时间戳列 . 我知道数据帧是不可变的,但有可能以某种方式构建一个新的数据帧,保留旧数据帧的所有列,但更新timstamp列?
代码说明了我想做的事情:
def updateTime(row):
import json
THRESHOLD_TIME = 60 * 30
client_timestamp = json.loads(row['data'])
client_timestamp = float(client_timestamp['timestamp'])
server_timestamp = float(row['timestamp'])
if server_timestamp - client_timestamp <= THRESHOLD_TIME:
new_row = ..... # copy contents of row
new_row['timestamp'] = client_timestamp
return new_row
else:
return row
df = df.map(updateTime)
我想过将行内容映射到一个元组,然后用.toDF()将其转换回数据帧,但我找不到将行内容复制到元组然后返回列名的方法 .
1 回答
如果您调整
updateTime
函数以接收Timestamp作为参数并返回新处理的时间戳,则可以创建UDF并直接在DataFrame列上使用它:但是,在你的情况下,我认为它有点复杂:
使用第二种方法,函数
getClientTime
从data
列接收一个值,并返回该值的客户机时间戳 . 然后,您可以使用它来创建包含此信息的新列(client_timestamp
) . 最后,您可以使用when根据server_timestamp
列和新创建的client_timestamp
列的值有条件地创建新列 .参考:
有关在Spark中使用UDF的更多信息,可以查看以下文章:http://www.sparktutorials.net/using-sparksql-udfs-to-create-date-times-in-spark-1.5
我使用的函数的主要文档可以在pyspark API docs中找到