首页 文章

spark cross加入内存泄漏

提问于
浏览
0

我有两张 table 要交叉加入,

表1:查询300M行表2:产品描述3000行

以下查询执行交叉连接并计算元组之间的分数,并选择前3个匹配,

query_df.repartition(10000).registerTempTable('queries')

product_df.coalesce(1).registerTempTable('products')

CREATE TABLE matches AS
SELECT *
FROM
  (SELECT *,
          row_number() over (partition BY a.query_id
                             ORDER BY 0.40 + 0.15*score_a + 0.20*score_b + 0.5*score_c DESC) AS rank
   FROM
     (SELECT /*+ MAPJOIN(b) */ a.query_id,
                               b.product_id,
                               func_a(a.qvec,b.pvec) AS score_a,
                               func_b(a.qvec,b.pvec) AS score_b,
                               func_c(a.qvec,b.pvec) AS score_c
      FROM queries a CROSS
      JOIN products b) a) a
WHERE rn <= 3

我的火花群看起来如下,

MASTER =“yarn-client”/opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 22 --executor-memory 30g --executor-cores 7 --driver-memory 10g - conf spark.yarn.executor.memoryOverhead = 10000 --conf spark.akka.frameSize = 2047

现在问题是,正如预期的那样,由于内存泄漏,由于产生极大的临时数据,作业在几个阶段后失败 . 我正在寻找一些优化上述操作的帮助/建议,使得作业应该能够在以并行方式选择下一个query_id之前运行 query_id 的匹配和过滤操作 - 类似于在for循环中对查询表进行排序 . 如果工作缓慢但成功,我很好,因为我可以请求更大的集群 .

上面的查询适用于较小的查询表,例如10000个记录 .

2 回答

  • 0

    在您希望将表A(大)与表B(小)连接的场景中,最佳做法是利用 broadcast join .

    https://stackoverflow.com/a/39404486/1203837中给出了清晰的概述 .

    希望这可以帮助 .

  • 0

    笛卡尔连接或交叉连接火花是非常昂贵的 . 我建议使用内连接加入表并首先保存输出数据 . 然后使用该数据帧进行进一步聚合 .

    如果较小的表不够小,那么 Map 连接或广播连接的一个小建议可能会失败 . 除非你确定使用广播连接的小表格大小 .

相关问题