首页 文章

PySpark PCA:如何将数据帧行从多列转换为单列DenseVector?

提问于
浏览
4

我想使用PySpark(Spark 1.6.2)对Hive表中存在的数值数据执行主成分分析(PCA) . 我能够将Hive表导入Spark数据帧:

>>> from pyspark.sql import HiveContext
>>> hiveContext = HiveContext(sc)
>>> dataframe = hiveContext.sql("SELECT * FROM my_table")
>>> type(dataframe)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> dataframe.columns
['par001', 'par002', 'par003', etc...]
>>> dataframe.collect()
[Row(par001=1.1, par002=5.5, par003=8.2, etc...), Row(par001=0.0, par002=5.7, par003=4.2, etc...), etc...]

有一篇优秀的StackOverflow帖子,展示了如何在PySpark中执行PCA:https://stackoverflow.com/a/33481471/2626491

在帖子的“测试”部分中,@ assellnaut创建了一个只包含一列的数据框(称为“要素”):

>>> from pyspark.ml.feature import *
>>> from pyspark.mllib.linalg import Vectors
>>> data = [(Vectors.dense([0.0, 1.0, 0.0, 7.0, 0.0]),),
...          (Vectors.dense([2.0, 0.0, 3.0, 4.0, 5.0]),),
...          (Vectors.dense([4.0, 0.0, 0.0, 6.0, 7.0]),)]
>>> df = sqlContext.createDataFrame(data,["features"])
>>> type(df)
<class 'pyspark.sql.dataframe.DataFrame'>
>>> df.columns
['features']
>>> df.collect()
[Row(features=DenseVector([0.0, 1.0, 0.0, 7.0, 0.0])), Row(features=DenseVector([2.0, 0.0, 3.0, 4.0, 5.0])), Row(features=DenseVector([4.0, 0.0, 0.0, 6.0, 7.0]))]

@ desertnaut的示例数据帧中的每一行都包含一个 DenseVector 对象,然后 pca 函数使用该对象 .

问:如何将Hive中的数据帧转换为单列数据帧("features"),其中每行包含表示原始行中所有值的 DenseVector

1 回答

  • 8

    你应该使用 VectorAssembler . 如果数据与此类似:

    from pyspark.sql import Row
    
    data = sc.parallelize([
        Row(par001=1.1, par002=5.5, par003=8.2),
        Row(par001=0.0, par002=5.7, par003=4.2)
    ]).toDF()
    

    你应该导入所需的类:

    from pyspark.ml.feature import VectorAssembler
    

    创建一个实例:

    assembler = VectorAssembler(inputCols=data.columns, outputCol="features")
    

    转换并选择:

    assembler.transform(data).select("features")
    

    您还可以使用用户定义的函数 . 在Spark 1.6中,从 mllib 导入 VectorsVectorUDT

    from pyspark.mllib.linalg import Vectors, VectorUDT
    

    udf 来自 sql.functions

    from pyspark.sql.functions import udf, array
    

    并选择:

    data.select(
      udf(Vectors.dense, VectorUDT())(*data.columns)
    ).toDF("features")
    

    这不是那么冗长,而是慢得多 .

相关问题