首页 文章

Spark Dataframe区分具有重复名称的列

提问于
浏览
36

正如我在Spark Dataframe中所知,多列的名称可以与下面的数据帧快照中显示的名称相同:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

上面的结果是通过将数据框连接到自身来创建的,您可以看到 4 列同时包含两个 af .

问题是,当我尝试使用 a 列进行更多计算时,我无法找到一种方法来选择 a ,我已经尝试了 df[0]df.select('a') ,两者都返回了我的错误评论:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Is there anyway in Spark API that I can distinguish the columns from the duplicated names again? or maybe some way to let me change the column names?

7 回答

  • 1

    假设您要加入的DataFrame是df1和df2,并且您要在列'a'上加入它们,那么您有2种方法

    Method 1

    df1.join(df2,'a','left_outer')

    这是一个很棒的方法,强烈建议 .

    Method 2

    df1.join(df2,df1.a == df2.a,'left_outer') . drop(df2.a)

  • 57

    在深入了解Spark API之后,我发现我可以先使用 alias 为原始数据帧创建别名,然后使用 withColumnRename 手动重命名别名上的每一列,最后执行 join 而不会导致列名重复 .

    更多细节可参考以下Spark Dataframe API

    pyspark.sql.DataFrame.alias

    pyspark.sql.DataFrame.withColumnRenamed

    However, I think this is only a troublesome workaround, and wondering if there is any better way for my question.

  • 4

    有一种比通过执行以下操作为所有列编写别名更简单的方法:

    df1.join(df2,['a'])
    

    如果您加入的密钥在两个表中都相同,则此方法有效 .

    https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

  • 29

    让我们从一些数据开始:

    from pyspark.mllib.linalg import SparseVector
    from pyspark.sql import Row
    
    df1 = sqlContext.createDataFrame([
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
        Row(a=125231, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
    ])
    
    df2 = sqlContext.createDataFrame([
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
        Row(a=107831, f=SparseVector(
            5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    ])
    

    有几种方法可以解决这个问题 . 首先,您可以使用父列明确引用子表列:

    df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
    
    ##  +--------------------+
    ##  |                   f|
    ##  +--------------------+
    ##  |(5,[0,1,2,3,4],[0...|
    ##  |(5,[0,1,2,3,4],[0...|
    ##  +--------------------+
    

    您还可以使用表别名:

    from pyspark.sql.functions import col
    
    df1_a = df1.alias("df1_a")
    df2_a = df2.alias("df2_a")
    
    df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
    
    ##  +--------------------+
    ##  |                   f|
    ##  +--------------------+
    ##  |(5,[0,1,2,3,4],[0...|
    ##  |(5,[0,1,2,3,4],[0...|
    ##  +--------------------+
    

    最后,您可以以编程方式重命名列:

    df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
    df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))
    
    df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
    
    ## +--------------------+
    ## |               f_df1|
    ## +--------------------+
    ## |(5,[0,1,2,3,4],[0...|
    ## |(5,[0,1,2,3,4],[0...|
    ## +--------------------+
    
  • 3

    我建议你更改 join 的列名

    df1.select('a as "df1_a", 'f as "df1_f")
       .join(df2.select('a as "df2_a", 'f as "df2_f"), 'df1_a === 'df2_a)
    

    结果 DataFrameschema

    (df1_a, df1_f, df2_a, df2_f)
    
  • 0

    您可以使用 def drop(col: Column) 方法删除重复列,例如:

    DataFrame:df1
    
    +-------+-----+
    | a     | f   |
    +-------+-----+
    |107831 | ... |
    |107831 | ... |
    +-------+-----+
    
    DataFrame:df2
    
    +-------+-----+
    | a     | f   |
    +-------+-----+
    |107831 | ... |
    |107831 | ... |
    +-------+-----+
    

    当我用df2加入df1时,DataFrame将如下所示:

    val newDf = df1.join(df2,df1("a")===df2("a"))
    
    DataFrame:newDf
    
    +-------+-----+-------+-----+
    | a     | f   | a     | f   |
    +-------+-----+-------+-----+
    |107831 | ... |107831 | ... |
    |107831 | ... |107831 | ... |
    +-------+-----+-------+-----+
    

    现在,我们可以使用 def drop(col: Column) 方法删除重复列'a'或'f',如下所示:

    val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
    
  • 4

    这就是我们如何在PySpark中加入两个Dataframes on same column names .

    df = df1.join(df2, ['col1','col2','col3'])
    

    如果在此之后执行 printSchema() ,则可以看到已删除重复的列 .

相关问题