我正在使用Spark Streaming从Kafka获取批量的JSON读数 . 生成的批次从RDD转换为数据帧 .
我的目标是对此数据帧的每一行进行分类,因此我使用VectorAssembler创建将传递给模型的功能:
sqlContext = SQLContext(rdd.context)
rawReading = sqlContext.jsonRDD(rdd)
sensorReadings = rawReading.selectExpr("actual.y AS yaw","actual.p AS pitch", "actual.r AS roll")
assembler = VectorAssembler(
inputCols=["yaw", "pitch", "roll"],
outputCol="features")
sensorReadingsFinal = assembler.transform(sensorReadings)
sensorReadingsFinal.show()
+---+-----+----+-----------------+
|yaw|pitch|roll| features|
+---+-----+----+-----------------+
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
| 18| 17.5| 120|[18.0,17.5,120.0]|
+---+-----+----+-----------------+
我有一个我之前训练过的随机森林模型 .
loadedModel = RandomForestModel.load(sc, "MyRandomForest.model")
我的问题是,在将整个内容插入数据库之前,如何对数据框中的每一行进行预测?
我最初想的是做这样的事......
prediction = loadedModel.predict(sensorReadings.features)
但我意识到,由于数据帧有多行,我需要以某种方式添加一列并逐行进行预测 . 也许我说这一切都错了?
我想要的最终数据框是这样的:
+---+-----+----+-----------------+
|yaw|pitch|roll| Prediction|
+---+-----+----+-----------------+
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
| 18| 17.5| 120| 1 |
+---+-----+----+-----------------+
此时我将它保存到数据库:
sensorReadingsFinal.write.jdbc("jdbc:mysql://localhost/testdb", "SensorReadings", properties=connectionProperties)
1 回答
以下是我最终要解决的问题:
我基本上创建了一个仅包含特征向量和readingID的新dataFrame,然后将其连接回原始的dataFrame .
这可能不是最优雅的解决方案,所以如果有人可以提出更好的建议,请这样做 .