我有一个具有以下架构的DataFrame,

owner_id|media_id|taken_time|sid|previous_taken_time|previous_media_id|previous_sid|

我有以下udf,我想使用以下窗口在表的每一行上应用它

window = Window.partitionBy(medias['owner_id']).orderBy(medias['taken_time'])

UDF

def create_update_stories(row):
    if row['previous_taken_time']:
        if row['previous_taken_time'].total_seconds() - row['taken_time'].total_seconds() < 60 * 60:
            if row['previous_storyid'] == row['previous_media_id']:
                sid = row['previous_sid'] + "_" + row['media_id']
                row['sid'] = sid
            else:
                row['sid'] = row['previous_sid']
    return row

现在我用以下方式调用它,

spark.udf.register("create_update_stories", create_update_stories)
medias = medias.withColumn("updated_story_id", create_update_stories(row).over(window))

这会引发以下错误,

NameError: name 'row' is not defined

调用此函数的正确方法是什么?任何帮助赞赏 .