首页 文章

在pyspark中的数据框上按行划分操作或UDF

提问于
浏览
0

我必须在pyspark中实现pandas .apply(function,axis = 1)(以应用行方式功能) . 因为我是新手,我不确定它是否可以通过map函数或使用UDF来实现 . 我无法在任何地方找到任何类似的实现 .

基本上我只想将一行传递给一个函数做一些操作来创建新的列,这些列依赖于当前行和前一行的值,然后返回修改的行来创建一个新的数据帧 . 与熊猫一起使用的功能之一如下:

previous = 1
def row_operation(row):
    global previous
    if pd.isnull(row["PREV_COL_A"])==True or (row["COL_A"]) != (row["PREV_COL_A"]):
        current = 1
    elif row["COL_C"] > cutoff:
        current = previous +1
    elif row["COL_C"]<=cutoff:
        current = previous
    else:
        current = Nan
    previous = current
    return current

这里PREV_COL_A只是COL_A滞后1行 .

请注意,此函数是最简单的,但不会返回其他行 . 如果有人可以指导我如何在pyspark中实现行操作,那将是一个很大的帮助 . TIA

1 回答

  • 0

    你可以使用rdd.mapPartition . 它将为您提供行上的迭代器,并输出要返回的结果行 . 您给出的可迭代将不允许您向前或向后索引,只返回下一行 . 但是,您可以在处理任何需要执行的操作时保存行 . 例如

    def my_cool_function(rows):
        prev_rows = []
    
        for row in rows:
           # Do some processing with all the rows, and return a result
           yield my_new_row
    
           if len(prev_rows) >= 2:
               prev_rows = prev_rows[1:]
    
           prev_rows.append(row)
    
    updated_rdd = rdd.mapPartitions(my_cool_function)
    

    注意,为了示例,我使用了一个列表来跟踪分区,但python列表实际上是没有高效的头部推送/弹出方法的数组,因此您可能希望使用实际的Queue .

相关问题