首页 文章

为spark数据框中的用户创建每个项目的每一行

提问于
浏览
2

我有一个如下的火花数据框:

User  Item    Purchased
1   A   1
1   B   2
2   A   3
2   C   4
3   A   3
3   B   2
3   D   6

only showing top 5 rows

每个用户对他们购买的商品都有一行 . 假设Purhcased是多少数量 . 购买(计数) .

但是,有些用户可能没有购买的项目,因此该项目特定用户没有行 . 我们只有用户购买的商品的行 . 因此,如果用户1购买了项目A,B,则对于这两个项目,我们有2行用户1 . 但是如果用户2购买了A,则C然后用户2具有项目A和C但没有B的行 . 我想最终每个用户应该拥有表格中所有项目的所有行以及每个项目的相应计数 .

我想将此数据帧转换为如上所述的数据帧,但也包含用户尚未看到的项的行,并将相应的计数赋予零 .

如下所示:

User    Item    Purchased
1   A   1
1   B   2
1   C   0
1   D   0
2   A   3
2   C   4
2   B   0
2   D   0
3   A   3
3   B   2
3   D   6
3   C   0
only showing top 5 rows

我想的一种方法是,如果我在第一个数据帧上使用sqlContext的cross_tab方法,那么我可以将每一行转换为具有相应值的列 . 对于用户没有它的项目,将为其创建一个列,并在那里放零 .

但那么如何将这些列转换回行?它也可能是一种迂回的方式 .

谢谢

2 回答

  • 1

    我们也可以通过仅使用 df 函数来实现这一点 .

    orders = [(1,"A",1),(1,"B",2),(2,"A",3),(2,"C",4),(3,"A",3),(3,"B",2),(3,"D",6)]
    df = sqlContext.createDataFrame(orders, ["user","item","purchased"])
    df_items = df.select("item").distinct().repartition(5).withColumnRenamed("item", "item_1")
    df_users = df.select("user").distinct().repartition(5).withColumnRenamed("user", "user_1")
    df_cartesian = df_users.join(df_items)
    //above expression returns cartesian product of users and items dfs
    joined_df = df_cartesian.join(df, [df_cartesian.user_1==df.user, df_cartesian.item_1==df.item], "left_outer").drop("user").drop("item")
    result_df = joined_df.fillna(0,["purchased"]).withColumnRenamed("item_1", "item").withColumnRenamed("user_1", "user")
    

    最后, result_df.show() 产生如下所示的欲望输出:

    +----+----+---------+
    |user|item|purchased|
    +----+----+---------+
    |   2|   A|        3|
    |   2|   B|        0|
    |   2|   C|        4|
    |   2|   D|        0|
    |   3|   A|        3|
    |   3|   B|        2|
    |   3|   C|        0|
    |   3|   D|        6|
    |   1|   A|        1|
    |   1|   B|        2|
    |   1|   C|        0|
    |   1|   D|        0|
    +----+----+---------+
    
  • 1
    df = sqlContext.createDataFrame([(1, 'A', 2), (1, 'B', 3), (2, 'A', 2)], ['user', 'item', 'purchased'])
    pivot = df.groupBy('user').pivot('item').sum('purchased').fillna(0)
    items = [i['item'] for i in df.select('item').distinct().collect()]
    flattened_rdd = pivot.rdd.flatMap(lambda x: [(x['user'], i, x[i]) for i in items])
    sqlContext.createDataFrame(flattened_rdd, ["user", "item", "purchased"]).show()
    

相关问题