首页 文章

在Apache Spark中为具有大量列的数据集创建ml管道的最佳方法

提问于
浏览
40

我正在使用Spark 2.1.1处理具有~2000个特征的数据集,并尝试创建一个基本的ML管道,包括一些变形金刚和分类器 .

让我们假设为了简单起见,我正在使用的Pipeline包含一个VectorAssembler,StringIndexer和一个Classifier,这将是一个相当常见的用例 .

// Pipeline elements
val assmbleFeatures: VectorAssembler = new VectorAssembler()
  .setInputCols(featureColumns)
  .setOutputCol("featuresRaw")

val labelIndexer: StringIndexer = new StringIndexer()
  .setInputCol("TARGET")
  .setOutputCol("indexedLabel")

// Train a RandomForest model.
val rf: RandomForestClassifier = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("featuresRaw")
  .setMaxBins(30)

// add the params, unique to this classifier
val paramGrid = new ParamGridBuilder()
  .addGrid(rf.numTrees, Array(5))
  .addGrid(rf.maxDepth, Array(5))
  .build()

// Treat the Pipeline as an Estimator, to jointly choose parameters for all Pipeline stages.
val evaluator = new BinaryClassificationEvaluator()
  .setMetricName("areaUnderROC")
  .setLabelCol("indexedLabel")

如果管道步骤被分成变换器管道(VectorAssembler StringIndexer)和第二个分类器管道,并且如果在两个管道之间删除了不必要的列,则训练成功 . 这意味着重新使用模型,必须在训练后保存两个PipelineModel,并且必须引入中间预处理步骤 .

// Split indexers and forest in two Pipelines.
val prePipeline = new Pipeline().setStages(Array(labelIndexer, assmbleFeatures)).fit(dfTrain)
// Transform data and drop all columns, except those needed for training 
val dfTrainT = prePipeline.transform(dfTrain)
val columnsToDrop = dfTrainT.columns.filter(col => !Array("featuresRaw", "indexedLabel").contains(col))
val dfTrainRdy = dfTrainT.drop(columnsToDrop:_*)

val mainPipeline = new Pipeline().setStages(Array(rf))

val cv = new CrossValidator()
  .setEstimator(mainPipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

val bestModel = cv.fit(dfTrainRdy).bestModel.asInstanceOf[PipelineModel]

(imho)更清洁的解决方案是将所有管道阶段合并为一个管道 .

val pipeline = new Pipeline()
  .setStages(Array(labelIndexer, assmbleFeatures, rf))

val cv = new CrossValidator()
  .setEstimator(pipeline)
  .setEvaluator(evaluator)
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)

// This will fail! 
val bestModel = cv.fit(dfTrain).bestModel.asInstanceOf[PipelineModel]

但是,将所有PipelineStages放入一个Pipeline会导致以下异常,可能是由于this PR最终会解决的问题:

错误CodeGenerator:编译失败:org.codehaus.janino.JaninoRuntimeException:类org.apache.spark.sql.catalyst.expressions.GeneratedClass的常量池$ SpecificUnsafeProjection已超过JVM限制0xFFFF

这样做的原因是VectorAssembler有效地(在本例中)加倍了DataFrame中的数据量,因为没有可以丢弃不必要列的变换器 . (见spark pipeline vector assembler drop other columns

该示例适用于golub dataset,并且必须执行以下预处理步骤:

import org.apache.spark.sql.types.DoubleType
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage}
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature._
import org.apache.spark.sql._
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

val df = spark.read.option("header", true).option("inferSchema", true).csv("/path/to/dataset/golub_merged.csv").drop("_c0").repartition(100)

// Those steps are necessary, otherwise training would fail either way
val colsToDrop = df.columns.take(5000)
val dfValid = df.withColumn("TARGET", df("TARGET_REAL").cast(DoubleType)).drop("TARGET_REAL").drop(colsToDrop:_*)

// Split df in train and test sets
val Array(dfTrain, dfTest) = dfValid.randomSplit(Array(0.7, 0.3))

// Feature columns are columns except "TARGET"
val featureColumns = dfTrain.columns.filter(col => col != "TARGET")

由于我是Spark的新手,我不确定什么是解决这个问题的最佳方法 . 你会建议......

  • 创建一个新的变换器,它可以删除列并且可以合并到管道中?

  • 拆分两条管道并引入中间步骤

  • 其他什么? :)

或者我错过了解决这个问题的重要事项(管道步骤,公关等)?


编辑:

我实现了一个新的Transformer DroppingVectorAssembler ,它会删除不必要的列,但是会抛出相同的异常 .

除此之外,将 spark.sql.codegen.wholeStage 设置为 false 并不能解决问题 .

2 回答

  • 2

    janino 错误是由于优化程序过程中创建的常量变量数 . JVM中允许的常量变量的最大限制是((2 ^ 16)-1) . 如果超过此限制,那么您将获得 Constant pool for class ... has grown past JVM limit of 0xFFFF

    解决此问题的JIRA是SPARK-18016,但此时它仍在进行中 .

    您的代码很可能在 VectorAssembler 阶段失败,因为它必须在单个优化任务期间针对数千个列执行 .

    我为这个问题开发的解决方法是通过对列的子集进行处理来创建“矢量矢量”,然后在最后将结果组合在一起以创建奇异的特征向量 . 这可以防止任何单个优化任务超出JVM常量限制 . 它并不优雅,但我已将它用于数据集,达到10k列范围 .

    此方法还允许您仍然保留单个管道,但它需要一些额外的步骤才能使其工作(创建子向量) . 从子向量创建特征向量后,可以根据需要删除原始源列 .

    Example Code:

    // IMPORT DEPENDENCIES
    import org.apache.spark.sql.SparkSession
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.{SQLContext, Row, DataFrame, Column}
    import org.apache.spark.ml.feature.VectorAssembler
    import org.apache.spark.ml.{Pipeline, PipelineModel}
    
    // Create first example dataframe
    val exampleDF = spark.createDataFrame(Seq(
      (1, 1, 2, 3, 8, 4, 5, 1, 3, 2, 0, 4, 2, 8, 1, 1, 2, 3, 8, 4, 5),
      (2, 4, 3, 8, 7, 9, 8, 2, 3, 3, 2, 6, 5, 4, 2, 4, 3, 8, 7, 9, 8),
      (3, 6, 1, 9, 2, 3, 6, 3, 8, 5, 1, 2, 3, 5, 3, 6, 1, 9, 2, 3, 6),
      (4, 7, 8, 6, 9, 4, 5, 4, 9, 8, 2, 4, 9, 2, 4, 7, 8, 6, 9, 4, 5),
      (5, 9, 2, 7, 8, 7, 3, 5, 3, 4, 8, 0, 6, 2, 5, 9, 2, 7, 8, 7, 3),
      (6, 1, 1, 4, 2, 8, 4, 6, 3, 9, 8, 8, 9, 3, 6, 1, 1, 4, 2, 8, 4)
    )).toDF("uid", "col1", "col2", "col3", "col4", "col5", 
            "col6", "col7", "col8", "col9", "colA", "colB", 
            "colC", "colD", "colE", "colF", "colG", "colH", 
            "colI", "colJ", "colK")
    
    // Create multiple column lists using the sliding method
    val Array(colList1, colList2, colList3, colList4) = exampleDF.columns.filter(_ != "uid").sliding(5,5).toArray
    
    // Create a vector assembler for each column list
    val colList1_assembler = new VectorAssembler().setInputCols(colList1).setOutputCol("colList1_vec")
    val colList2_assembler = new VectorAssembler().setInputCols(colList2).setOutputCol("colList2_vec")
    val colList3_assembler = new VectorAssembler().setInputCols(colList3).setOutputCol("colList3_vec")
    val colList4_assembler = new VectorAssembler().setInputCols(colList4).setOutputCol("colList4_vec")
    
    // Create a vector assembler using column list vectors as input
    val features_assembler = new VectorAssembler().setInputCols(Array("colList1_vec","colList2_vec","colList3_vec","colList4_vec")).setOutputCol("features")
    
    // Create the pipeline with column list vector assemblers first, then the final vector of vectors assembler last
    val pipeline = new Pipeline().setStages(Array(colList1_assembler,colList2_assembler,colList3_assembler,colList4_assembler,features_assembler))
    
    // Fit and transform the data
    val featuresDF = pipeline.fit(exampleDF).transform(exampleDF)
    
    // Get the number of features in "features" vector
    val featureLength = (featuresDF.schema(featuresDF.schema.fieldIndex("features")).metadata.getMetadata("ml_attr").getLong("num_attrs"))
    
    // Print number of features in "features vector"
    print(featureLength)
    

    (注意:创建列列表的方法应该以编程方式完成,但为了理解这个概念,我保持这个例子很简单 . )

  • 1

    您获得的 janino 错误是因为根据功能集,生成的代码会变大 .

    我将步骤分成不同的流水线并删除不必要的功能,保存中间模型,如 StringIndexerOneHotEncoder ,并在预测阶段加载它们,这也很有用,因为对于必须预测的数据,转换会更快 .

    最后,在运行 VectorAssembler stage之后,您不需要保留功能列,因为它将功能转换为 feature vectorlabel 列,这就是运行预测所需的全部内容 .

    Example of Pipeline in Scala with saving of intermediate steps-(Older spark API)

    此外,如果您使用的是旧版本的Spark 1.6.0,则需要检查修补版本,即2.1.1或2.2.0或1.6.4,否则即使有大约400个功能列,您也会遇到 Janino 错误 .

相关问题