首页 文章

如何在PySpark DataFrame中替换无穷大

提问于
浏览
1

似乎没有支持替换无限值 . 我尝试了下面的代码,但它不起作用 . 还是我错过了什么?

a=sqlContext.createDataFrame([(None, None), (1, np.inf), (None, 2)])
a.replace(np.inf, 10)

或者我必须采取痛苦的路线:将PySpark DataFrame转换为pandas DataFrame,替换无穷大值,并将其转换回PySpark DataFrame

1 回答

  • 2

    似乎没有支持替换无限值 .

    实际上它看起来像是一个Py4J bug而不是 replace 本身的问题 . 见Support nan/inf between Python and Java .

    作为解决方法,您可以尝试UDF(慢速选项):

    from pyspark.sql.types import DoubleType
    from pyspark.sql.functions import col, lit, udf, when
    
    df = sc.parallelize([(None, None), (1.0, np.inf), (None, 2.0)]).toDF(["x", "y"])
    
    replace_infs_udf = udf(
        lambda x, v: float(v) if x and np.isinf(x) else x, DoubleType()
    )
    
    df.withColumn("x1", replace_infs_udf(col("y"), lit(-99.0))).show()
    
    ## +----+--------+-----+
    ## |   x|       y|   x1|
    ## +----+--------+-----+
    ## |null|    null| null|
    ## | 1.0|Infinity|-99.0|
    ## |null|     2.0|  2.0|
    ## +----+--------+-----+
    

    或表达式如下:

    def replace_infs(c, v):
        is_infinite = c.isin([
            lit("+Infinity").cast("double"),
            lit("-Infinity").cast("double")
        ])
        return when(c.isNotNull() & is_infinite, v).otherwise(c)
    
    df.withColumn("x1", replace_infs(col("y"), lit(-99))).show()
    
    ## +----+--------+-----+
    ## |   x|       y|   x1|
    ## +----+--------+-----+
    ## |null|    null| null|
    ## | 1.0|Infinity|-99.0|
    ## |null|     2.0|  2.0|
    ## +----+--------+-----+
    

相关问题