首页 文章

PySpark Mllib预测DataFrame中的所有行

提问于
浏览
0

我正在使用Spark Streaming从Kafka获取批量的JSON读数 . 生成的批次从RDD转换为数据帧 .

我的目标是对此数据帧的每一行进行分类,因此我使用VectorAssembler创建将传递给模型的功能:

sqlContext = SQLContext(rdd.context)
rawReading = sqlContext.jsonRDD(rdd)
sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll")
assembler = VectorAssembler(
        inputCols=["yaw", "pitch", "roll"],
        outputCol="features")
sensorReadingsFinal = assembler.transform(sensorReadings)
sensorReadingsFinal.show()
+---+-----+----+-----------------+
|yaw|pitch|roll|         features|
+---+-----+----+-----------------+
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
+---+-----+----+-----------------+

我有一个我之前训练过的随机森林模型 .

loadedModel = RandomForestModel.load(sc, "MyRandomForest.model")

我的问题是,在将整个内容插入数据库之前,如何对数据框中的每一行进行预测?

我最初想的是做这样的事......

prediction = loadedModel.predict(sensorReadings.features)

但我意识到,由于数据帧有多行,我需要以某种方式添加一列并逐行进行预测 . 也许我说这一切都错了?

我想要的最终数据框是这样的:

+---+-----+----+-----------------+
|yaw|pitch|roll|       Prediction|
+---+-----+----+-----------------+
| 18| 17.5| 120|              1  |
| 18| 17.5| 120|              1  |
| 18| 17.5| 120|              1  |
| 18| 17.5| 120|              1  |
| 18| 17.5| 120|              1  |
+---+-----+----+-----------------+

此时我将它保存到数据库:

sensorReadingsFinal.write.jdbc("jdbc:mysql://localhost/testdb", "SensorReadings", properties=connectionProperties)

1 回答

  • 0

    以下是我最终要解决的问题:

    # Convert DStream RDD's to DataFrame and run SQL query
        sqlContext = SQLContext(rdd.context)
        if rdd.isEmpty() == False:
            rawReading = sqlContext.jsonRDD(rdd)
            sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll")
            assembler = VectorAssembler(
                        inputCols=["yaw","pitch","roll"], # Must be in same order as what was used to train the model.  Testing using only pitch since model has limited dataset.
                        outputCol="features")
            sensorReadings = assembler.transform(sensorReadings)
    
            # Create a new dataFrame of predictions and readingID
            predictions = loadedModel.predict(sensorReadings.map(lambda x: x.features))
            predictionsDF = sensorReadings.map(lambda x: x.readingID).zip(predictions).toDF(["readingID","positionID"])
    
            # Join the prediction dataFrame back to the sensorReadings dataFrame.  Drop the duplicate readingID column.
            combinedDF = sensorReadings.join(predictionsDF, sensorReadings.readingID == predictionsDF.readingID).drop(predictionsDF.readingID)
    
            #  Drop the feature vector column
            combinedDF = combinedDF.drop("features")
    
            combinedDF.show()
    

    我基本上创建了一个仅包含特征向量和readingID的新dataFrame,然后将其连接回原始的dataFrame .

    这可能不是最优雅的解决方案,所以如果有人可以提出更好的建议,请这样做 .

相关问题