我使用Spark和Scala进行时间序列分析 . 我有一个从Cassandra数据库中获取的数据集,如下所示:
scala> train.printSchema
root
|-- timestamp: timestamp (nullable = true)
|-- vx: double (nullable = true)
|-- speed: double (nullable = true)
我尝试了线性回归,如here所示只是为了看它是如何工作的 .
scala> val lr = new LinearRegression().
| setMaxIter(10).
| setRegParam(0.3).
| setElasticNetParam(0.8)
scala> val lrModel = lr.fit(train)
但是,我收到一个错误:
java.lang.IllegalArgumentException:字段“features”不存在 . at org.apache.spark.sql.types.StructType $$ anonfun $ apply $ 1.apply(StructType.sca la:266)at org.apache.spark.sql.types.StructType $$ anonfun $ apply $ 1.apply(StructType .sca la:266)在scala.collection.MapLike $ class.getOrElse(MapLike.scala:128)at scala.collection.AbstractMap.getOrElse(Map.scala:59)at org.apache.spark.sql.types.StructType .apply(StructType.scala:265)atg.apache.spark.ml.util.SchemaUtils $ .checkColumnType(SchemaUtils.scala:40)at org.apache.spark.ml.PredictorParams $ class.validateAndTransformSchema(Predic tor.scala :51)atg.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:82)位于org.apache.spark.ml的org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:144) . PipelineStage.transformSchema(Pipeline.scala:74)在org.apache.spark.ml.Predictor.fit(Predictor.scala:100)... 66 elided
似乎我必须使用 VectorAssembler
来创建包含预测变量的特征列,
scala> val assembler = new VectorAssembler().
| setInputCols(Array("timestamp","speed")).
| setOutputCol("features")
scala> val output = assembler.transform(train)
但它会抛出错误 TimestampType is not supported
.
java.lang.IllegalArgumentException:不支持数据类型TimestampType . at org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ transformSchema $ 1.appl y(VectorAssembler.scala:121)at org.apache.spark.ml.feature.VectorAssembler $$ anonfun $ transformSchema $ 1.appl y( VectorAssembler.scala:117)scala.collection.IndexedSeqOptimized $ class.foreach(IndexedSeqOptimized.scal a:33)at org.apache.spark的scala.collection.mutable.ArrayOps $ ofRef.foreach(ArrayOps.scala:186) . ml.feature.VectorAssembler.transformSchema(VectorAssembler .scala:117)atg.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:74)at org.apache.spark.ml.feature.VectorAssembler.transform(VectorAssembler . 斯卡拉:54)... 66被省略了
如果我省略时间戳并且只在 VectorAssembler
中使用一列,则会再次抛出错误 . 见下文,
scala> val assembler = new VectorAssembler().
| setInputCols(Array("speed")).
| setOutputCol("features")
scala> val output = assembler.transform(train)
scala> val lrModel = lr.fit(output)
java.lang.IllegalArgumentException:字段“label”不存在 . at org.apache.spark.sql.types.StructType $$ anonfun $ apply $ 1.apply(StructType.sca la:266)at org.apache.spark.sql.types.StructType $$ anonfun $ apply $ 1.apply(StructType .sca la:266)在scala.collection.MapLike $ class.getOrElse(MapLike.scala:128)at scala.collection.AbstractMap.getOrElse(Map.scala:59)at org.apache.spark.sql.types.StructType .apply(StructType.scala:265)atg.apache.spark.ml.util.SchemaUtils $ .checkNumericType(SchemaUtils.scala:71)at org.apache.spark.ml.PredictorParams $ class.validateAndTransformSchema(Predic tor.scala :53)atg.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:82)位于org.apache.spark.ml的org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:144) . PipelineStage.transformSchema(Pipeline.scala:74)在org.apache.spark.ml.Predictor.fit(Predictor.scala:100)... 66 elided
当我单独输入 speed
作为预测器时,我不知道为什么它会说 Field "label" does not exist
. 任何帮助深表感谢 .
1 回答
您需要定义要用作要素和类标签的列/列 . 如果将多列用作要素,则使用
VectorAssembler()
是合适的,就像您所做的那样 . 否则,只需使用setFeaturesCol()
方法和列名就足够了 . 请注意,此处的输入列必须包含向量,不能是双精度数 .对于类标签(它属于哪个类),您可以使用
setLabelCol()
来定义要使用的列 . 在您的情况下,由于timestamp
和speed
列是预测变量,我认为vx
列是标签 .要使用时间戳,您只需将其转换为Unix纪元时间;
这将为您提供自1970年1月1日以来的秒数 .
希望能帮助到你!