请帮助初学者 . 订单放置的常用数据库,全部在一个表中 .
使用Python分析Apache Spark中的数据 . 想要编写一个查询,以便通过电子邮件排序客户的所有交易,这些客户订购的产品现已停产,并且订单尚未发货 . 基本上用“&item_in_list(F.lit(”NotShipped“),ShippedStatus)”它不起作用 .
%python
import pyspark.sql.functions as F
from pyspark.sql.types import *
list_len = F.udf(lambda x: len(x), IntegerType())
item_in_list = F.udf(lambda x, y: x in y, BooleanType())
df = spark.sql("select * from orderdb")
df1 = df.select("email", "OrderedProduct","ShippedStatus").groupBy("email")
df1 = df1.agg(F.collect_set("OrderedProduct"))\
.withColumnRenamed("collect_set(OrderedProduct)", "OrderedProduct")
df1 = df1.filter((list_len(df1.OrderedProduct) > 1) &
item_in_list(F.lit("DiscontinuedProduct"), OrderedProduct)
&item_in_list(F.lit("NotShipped"), ShippedStatus)
df1 = df1.select("email")
df = df1.join(df, "email", "left_outer")
display(df)
ID字符串null date DateTimestamp null OrderedProduct string null ShippedStatus boolean null
1 回答
首先, udf's perform very bad in pyspark . 如果要更改类型,请使用以下内容:
话虽这么说,我们需要一个可重复的例子,但我想你可以用
'where'
子句解决它 .