首页 文章

Spark DataFrame到RDD并返回

提问于
浏览
3

我正在使用Scala编写Apache Spark应用程序 . 为了处理和存储数据,我使用DataFrames . 我有一个很好的管道,包括特征提取和MultiLayerPerceptron分类器,使用ML API .

我也想使用SVM(用于比较目的) . 事情是(如果我弄错的话,纠正我)只有MLLib提供SVM . 并且MLLib还没有准备好处理DataFrames,只有RDD .

所以我想我可以使用DataFrames维护我的应用程序的核心并使用SVM 1)我只需将DataFrame的列转换为 RDD[LabeledPoint] 和2)在分类后将SVMs预测作为新列添加到DataFrame .

我用一个小函数处理的第一部分:

private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
    val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
    rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
}

我必须指定和转换向量的类型,因为特征提取方法使用ML API而不是MLLib .

然后,这个 RDD[LabeledPoint] 被送到SVM并且分类顺利进行,没有问题 . 在最后和火花的例子之后我得到一个 RDD[Double]

val predictions = rdd.map(point => model.predict(point.features))

现在,我想将预测分数作为列添加到原始DataFrame并返回它 . 这是我被卡住的地方 . 我可以使用 RDD[Double] 转换为DataFrame

(sql context ommited)
import sqlContext.implicits._
val plDF = predictions.toDF("prediction")

但是如何加入两个DataFrame,其中第二个DataFrame成为原始列的一个列?我尝试使用方法 joinunion ,但得到SQL异常,因为DataFrames没有相等的列来加入或联合 .

EDIT 我试过了

data.withColumn("prediction", plDF.col("prediction"))

但我得到一个分析例外:(

1 回答

  • 0

    我没有想到如何在不重复使用RDD的情况下做到这一点,但无论如何这里是我用RDD解决它的方式 . 添加了其余代码,以便任何人都能理解完整的逻辑 . 任何建议表示赞赏 .

    package stuff
    
    import java.util.logging.{Level, Logger}
    
    import org.apache.spark.mllib.classification.{SVMModel, SVMWithSGD}
    import org.apache.spark.mllib.linalg.Vectors
    import org.apache.spark.mllib.regression.LabeledPoint
    import org.apache.spark.rdd.RDD
    import org.apache.spark.sql.{DataFrame, Row, SQLContext}
    
    /**
      * Created by camandros on 10-03-2017.
      */
    class LinearSVMClassifier extends Classifier with Serializable{
    
      @transient lazy val log: Logger = Logger.getLogger(getClass.getName)
    
      private var model : SVMModel = _
    
      override def train(data : DataFrame): Unit = {
        val rdd = dataFrameToRDD(data)
        // Run training algorithm to build the model
        val numIter : Int = 100
        val step = Osint.properties(Osint.SVM_STEPSIZE).toDouble
        val c = Osint.properties(Osint.SVM_C).toDouble
        log.log(Level.INFO, "Initiating SVM training with parameters: C="+c+", step="+step)
        model = SVMWithSGD.train(rdd, numIterations = numIter, stepSize = step, regParam = c)
        log.log(Level.INFO, "Model training finished")
    
        // Clear the default threshold.
        model.clearThreshold()
      }
    
      override def classify(data : DataFrame): DataFrame = {
        log.log(Level.INFO, "Converting DataFrame to RDD")
        val rdd = dataFrameToRDD(data)
        log.log(Level.INFO, "Conversion finished; beginning classification")
        // Compute raw scores on the test set.
        val predictions = rdd.map(point => model.predict(point.features))
        log.log(Level.INFO, "Classification finished; Transforming RDD to DataFrame")
    
        val sqlContext : SQLContext = Osint.spark.sqlContext
        val tupleRDD = data.rdd.zip(predictions).map(t => Row.fromSeq(t._1.toSeq ++ Seq(t._2)))
        sqlContext.createDataFrame(tupleRDD, data.schema.add("predictions", "Double"))
    
        //TODO this should work it doesn't since this "withColumn" method seems to be applicable only to add
        // new columns using information from the same dataframe; therefore I am using the horrible rdd conversion
        //val sqlContext : SQLContext = Osint.spark.sqlContext
        //import sqlContext.implicits._
        //val plDF = predictions.toDF("predictions")
        //data.withColumn("prediction", plDF.col("predictions"))
      }
    
      private def dataFrameToRDD(dataFrame : DataFrame) : RDD[LabeledPoint] = {
        val rddMl = dataFrame.select("label", "features").rdd.map(r => (r.getInt(0).toDouble, r.getAs[org.apache.spark.ml.linalg.SparseVector](1)))
        rddMl.map(r => new LabeledPoint(r._1, Vectors.dense(r._2.toArray)))
      }
    }
    

相关问题