首页 文章

将数据框列和外部列表传递给withColumn下的udf

提问于
浏览
10

我有一个具有以下结构的火花数据帧 . bodyText_token具有标记(处理/单词集) . 我有一个已定义关键字的嵌套列表

root
 |-- id: string (nullable = true)
 |-- body: string (nullable = true)
 |-- bodyText_token: array (nullable = true)

keyword_list=['union','workers','strike','pay','rally','free','immigration',],
['farmer','plants','fruits','workers'],['outside','field','party','clothes','fashions']]

我需要检查每个关键字列表下有多少令牌,并将结果添加为现有数据帧的新列 . 例如:如果 tokens =["become", "farmer","rally","workers","student"] ,结果将是 - > [1,2,0]

以下功能按预期工作 .

def label_maker_topic(tokens,topic_words):
    twt_list = []
    for i in range(0, len(topic_words)):
        count = 0
        #print(topic_words[i])
        for tkn in tokens:
            if tkn in topic_words[i]:
                count += 1
        twt_list.append(count)

    return twt_list

我在withColumn下使用了udf来访问该函数,但是我收到了一个错误 . 我认为这是关于将外部列表传递给udf . 有没有办法可以将外部列表和datafram列传递给udf并向我的数据帧添加新列?

topicWord = udf(label_maker_topic,StringType())
myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token,keyword_list))

2 回答

  • 24

    最干净的解决方案是使用闭包传递其他参数:

    def make_topic_word(topic_words):
         return udf(lambda c: label_maker_topic(c, topic_words))
    
    df = sc.parallelize([(["union"], )]).toDF(["tokens"])
    
    (df.withColumn("topics", make_topic_word(keyword_list)(col("tokens")))
        .show())
    

    这不需要对 keyword_list 或您使用UDF包装的函数进行任何更改 . 您还可以使用此方法传递任意对象 . 这可以用于传递例如 sets 的列表以进行有效的查找 .

    如果要使用当前的UDF并直接传递 topic_words ,则必须先将其转换为列文字:

    from pyspark.sql.functions import array, lit
    
    ks_lit = array(*[array(*[lit(k) for k in ks]) for ks in keyword_list])
    df.withColumn("ad", topicWord(col("tokens"), ks_lit)).show()
    

    根据您的数据和要求,可以使用替代的,更有效的解决方案,这些解决方案不需要UDF(爆炸聚合崩溃)或查找(散列矢量操作) .

  • 8

    以下工作正常可以将任何外部参数传递给UDF(一个经过调整的代码来帮助任何人)

    topicWord=udf(lambda tkn: label_maker_topic(tkn,topic_words),StringType())
    myDF=myDF.withColumn("topic_word_count",topicWord(myDF.bodyText_token))
    

相关问题