首页 文章

Pyspark DataFrame - 如何使用变量进行连接?

提问于
浏览
3

我在python上使用Spark数据帧在两个数据帧上进行连接时遇到了一些麻烦 . 我有两个数据框,我必须更改列的名称,以使它们对每个数据框唯一,所以稍后我可以告诉哪个列是哪个 . 我这样做是为了重命名列(firstDf和secondDf是使用createDataFrame函数创建的Spark DataFrames):

oldColumns = firstDf.schema.names
newColumns = list(map(lambda x: "{}.{}".format('firstDf', x), oldColumns))
firstDf = firstDf.toDF(*newColumns)

我为第二个DataFrame重复了这个 . 然后我尝试使用以下代码加入它们:

from pyspark.sql.functions import *

firstColumn = 'firstDf.firstColumn'
secondColumn = 'secondDf.firstColumn'
joinedDF = firstDf.join(secondDf, col(firstColumn) == col(secondColumn), 'inner')

像这样使用它我得到以下错误:

AnalysisException“无法解析'firstDf.firstColumn'给定的输入列:[firstDf.firstColumn,...];”

这只是为了说明列存在于输入列数组中 .

如果我不重命名DataFrames列,我可以使用这段代码加入它们:

joinedDf = firstDf.join(secondDf, firstDf.firstColumn == secondDf.firstColumn, 'inner')

但这给了我一个含有模糊列名的DataFrame .

关于如何处理这个的任何想法?

1 回答

  • 1

    一般来说,不要在名称中使用点 . 这些具有特殊含义(可用于确定表或访问 struct 字段)并需要正确识别一些额外的工作 .

    对于equi连接,您只需要一个列名:

    from pyspark.sql.functions import col
    
    firstDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
    secondDf = spark.createDataFrame([(1, "foo")], ("firstColumn", "secondColumn"))
    
    column = 'firstColumn'
    firstDf.join(secondDf, [column], 'inner')
    
    ## DataFrame[firstColumn: bigint, secondColumn: string, secondColumn: string]
    

    对于复杂的情况,请使用表别名:

    firstColumn = 'firstDf.firstColumn'
    secondColumn = 'secondDf.firstColumn'
    
    firstDf.alias("firstDf").join(
        secondDf.alias("secondDf"),
        # After alias prefix resolves to table name
        col(firstColumn) == col(secondColumn),
       "inner"
    )
    
    ## DataFrame[firstColumn: bigint, secondColumn: string, firstColumn: bigint, secondColumn: string]
    

    您也可以直接使用父框架:

    column = 'firstColumn'
    
    firstDf.join(secondDf, firstDf[column] == secondDf[column])
    

相关问题