首页 文章

Spark SQL广播散列连接

提问于
浏览
4

我正在尝试使用SparkSQL在数据帧上执行广播散列连接,如下所示:https://docs.cloud.databricks.com/docs/latest/databricks_guide/06%20Spark%20SQL%20%26%20DataFrames/05%20BroadcastHashJoin%20-%20scala.html

在那个例子中,(小) DataFrame 通过saveAsTable持久化,然后通过spark SQL连接(即通过 sqlContext.sql("..."))

我遇到的问题是我需要使用sparkSQL API来构造我的SQL(我还要加入~50个带有ID列表的表,并且不想手工编写SQL) .

How do I tell spark to use the broadcast hash join via the API?  The issue is that if I load the ID list (from the table persisted via `saveAsTable`) into a `DataFrame` to use in the join, it isn't clear to me if Spark can apply the broadcast hash join.

3 回答

  • 15

    您可以使用 broadcast 函数明确标记 DataFrame 足够广播:

    Python

    from pyspark.sql.functions import broadcast
    
    small_df = ...
    large_df = ...
    
    large_df.join(broadcast(small_df), ["foo"])
    

    或广播提示(Spark> = 2.2):

    large_df.join(small_df.hint("broadcast"), ["foo"])
    

    Scala

    import org.apache.spark.sql.functions.broadcast
    
    val smallDF: DataFrame = ???
    val largeDF: DataFrame = ???
    
    largeDF.join(broadcast(smallDF), Seq("foo"))
    

    或广播提示(Spark> = 2.2):

    largeDF.join(smallDF.hint("broadcast"), Seq("foo"))
    

    SQL

    您可以使用提示(Spark >= 2.2):

    SELECT /*+ MAPJOIN(small) */ * 
    FROM large JOIN small
    ON large.foo = small.foo
    

    要么

    SELECT /*+  BROADCASTJOIN(small) */ * 
    FROM large JOIN small
    ON large.foo = small.foo
    

    要么

    SELECT /*+ BROADCAST(small) */ * 
    FROM large JOIN small
    ON larger.foo = small.foo
    

    R (SparkR):

    使用 hint (Spark> = 2.2):

    join(large, hint(small, "broadcast"), large$foo == small$foo)
    

    随着 broadcast (Spark> = 2.3)

    join(large, broadcast(small), large$foo == small$foo)
    

    Note

    如果其中一个结构相对较小,则广播连接很有用 . 否则它可能比完全洗牌要贵得多 .

  • 4
    jon_rdd = sqlContext.sql( "select * from people_in_india  p
                                join states s
                                on p.state = s.name")
    
    
    jon_rdd.toDebugString() / join_rdd.explain() :
    

    shuffledHashJoin :
    印度的所有数据将被改组为每个州只有29个密钥 . 问题:不均匀的分片 . 与29个输出分区有限的并行性 .

    broadcaseHashJoin

    将小RDD广播到所有工作节点 . 大型rdd的并行性仍然保持不变,甚至不需要洗牌 .
    enter image description here

    PS:图像可能很丑,但信息量很大 .

  • 2

    通过广播连接,连接方程的一侧正在实现并发送给所有映射器 . 因此,它被视为 Map 侧连接 .

    随着数据集的实现和通过网络发送,如果数据集相当小,它只会带来显着的性能提升 .

    所以如果你想要执行 smallDF.join(largeDF)

    Wait..!!! another constraint is that it also needs to fit completely into the memory of each executor.It also needs to fit into the memory of the Driver!

    使用Torrent协议即Peer-to-Peer协议在 Actuator 之间共享广播变量,并且Torrent协议的优点是对等体彼此共享文件块而不依赖于保存所有块的中央实体 .

    上面提到的例子足以开始播放广播连接 .

    Note: Cannot modify value after creation. If you try, change will only be on one&node

相关问题