我有两张 table 要交叉加入,
表1:查询300M行表2:产品描述3000行
以下查询执行交叉连接并计算元组之间的分数,并选择前3个匹配,
query_df.repartition(10000).registerTempTable('queries')
product_df.coalesce(1).registerTempTable('products')
CREATE TABLE matches AS
SELECT *
FROM
(SELECT *,
row_number() over (partition BY a.query_id
ORDER BY 0.40 + 0.15*score_a + 0.20*score_b + 0.5*score_c DESC) AS rank
FROM
(SELECT /*+ MAPJOIN(b) */ a.query_id,
b.product_id,
func_a(a.qvec,b.pvec) AS score_a,
func_b(a.qvec,b.pvec) AS score_b,
func_c(a.qvec,b.pvec) AS score_c
FROM queries a CROSS
JOIN products b) a) a
WHERE rn <= 3
我的火花群看起来如下,
MASTER =“yarn-client”/opt/mapr/spark/spark-1.6.1/bin/pyspark --num-executors 22 --executor-memory 30g --executor-cores 7 --driver-memory 10g - conf spark.yarn.executor.memoryOverhead = 10000 --conf spark.akka.frameSize = 2047
现在问题是,正如预期的那样,由于内存泄漏,由于产生极大的临时数据,作业在几个阶段后失败 . 我正在寻找一些优化上述操作的帮助/建议,使得作业应该能够在以并行方式选择下一个query_id之前运行 query_id 的匹配和过滤操作 - 类似于在for循环中对查询表进行排序 . 如果工作缓慢但成功,我很好,因为我可以请求更大的集群 .
上面的查询适用于较小的查询表,例如10000个记录 .
2 回答
在您希望将表A(大)与表B(小)连接的场景中,最佳做法是利用 broadcast join .
https://stackoverflow.com/a/39404486/1203837中给出了清晰的概述 .
希望这可以帮助 .
笛卡尔连接或交叉连接火花是非常昂贵的 . 我建议使用内连接加入表并首先保存输出数据 . 然后使用该数据帧进行进一步聚合 .
如果较小的表不够小,那么 Map 连接或广播连接的一个小建议可能会失败 . 除非你确定使用广播连接的小表格大小 .