首页 文章

在 Spark 中保存具有非常大值的数据框

提问于
浏览
0

使用 Spark 数据帧,我正在执行 groupBy 操作,以将与键关联的所有值收集到列表中。所收集值的大小可能相差很大。实际上,我正在尝试通过合并用于后期处理的组合键的值来生成“文档”。

为了说明这一点,df 是具有 3 个字符串列 A,B,C 的数据帧。

df.groupBy(concat($"A", lit("-"), $"B").alias("Key")).agg(collect_list($"C").alias("values"))

运行此查询以获取两行是可行的,这意味着该命令是正确的。

但是,当我尝试将完整的输出另存为 CSV 压缩文件或 Parquet 时,此过程由于以下几个原因而失败,包括内存问题(我试图进行调整)和加密加密。

我怀疑某些值非常大是问题所在。是否有针对此类情况的最佳做法?

1 回答

  • 0

    尽管确切地知道遇到什么错误会有所帮助,但由于太多数据流向一行,这一事实很可能会引起您的问题。为了克服这个问题,您可以使用随机列添加一些人工分区。这样,您的分组数据将在多行之间共享,然后在多个文件中共享,从而防止发生 OOM 错误。这是您可以执行的操作:

    val df = sc.parallelize(Seq((1, 2, 3), (1, 2, 4), (1, 2, 5), (1, 3, 2)))
        .toDF("A", "B", "C")
    

    您要尝试的是这样做,并且 C 可能会变得太大。

    df.groupBy("A", "B")
      .agg(collect_list('C))
      .show
    +---+---+---------------+                                                       
    |  A|  B|collect_list(C)|
    +---+---+---------------+
    |  1|  2|      [3, 4, 5]|
    |  1|  3|            [2]|
    +---+---+---------------+
    

    相反,您可以添加随机列 R 来防止 C 变得太大。数据将在 R 的多个值之间拆分,从而减少了可能在一行中传递的数据量。在此,我为示例使用了 3 个可能的随机值,但在您的情况下,必须使用更大的值。

    val new_df = df.withColumn("R", floor(rand()*3)).groupBy("A", "B", "R").agg(collect_list('C) as "C").show
    new_df.show
    +---+---+---+------+
    |  A|  B|  R|     C|
    +---+---+---+------+
    |  1|  2|  1|   [5]|
    |  1|  2|  2|[3, 4]|
    |  1|  3|  1|   [2]|
    +---+---+---+------+
    

    然后,您可以像这样编写分区数据框。

    new_df.write.partitionBy("A", "B", "R").parquet("...")
    

相关问题