首页 文章

Spark花了很长时间在HadoopRDD上:输入分裂

提问于
浏览
1

我正在一个大型libsvm文件上使用SGD运行逻辑回归 . 该文件大小约为10 GB,有4000万个培训示例 .

当我使用spark-submit运行我的scala代码时,我注意到spark花了很多时间记录这个:

18/02/07 04:44:50 INFO HadoopRDD:输入拆分:文件:/ ebs2 / preprocess / xaa:234881024 33554432 18/02/07 04:44:51 INFO执行人:在阶段1.0完成任务6.0(TID 7 ) . 发送到驱动程序的875字节结果18/02/07 04:44:51 INFO TaskSetManager:在阶段1.0中启动任务8.0(TID 9,localhost,执行程序驱动程序,分区8,PROCESS_LOCAL,7872字节)18/02/07 04:44 :51 INFO TaskSetManager:在localhost(执行程序驱动程序)(10/307)上的1025毫秒的阶段1.0(TID 7)中完成的任务6.0

Why is Spark doing so many 'HadoopRDD: Input splits'? What's the purpose of that, and how do I go about speeding up or getting rid of this process?

这是代码:

import org.apache.spark.SparkContext
import org.apache.spark.mllib.evaluation.MulticlassMetrics 

import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.mllib.classification.LogisticRegressionWithSGD
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.optimization.L1Updater
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.util.MLUtils
import scala.compat.Platform._ 


object test {

    def main(args: Array[String]) {

        val nnodes = 1
        val epochs = 3

        val conf = new SparkConf().setAppName("Test Name")
        val sc = new SparkContext(conf)

        val t0=currentTime
        val train = MLUtils.loadLibSVMFile(sc, "/ebs2/preprocess/xaa",  262165, 4)
        val test = MLUtils.loadLibSVMFile(sc,  "/ebs2/preprocess/xab",   262165, 4)
        val t1=currentTime;

        println("START")
        val lrAlg = new LogisticRegressionWithSGD()
        lrAlg.optimizer.setMiniBatchFraction(10.0/40000000.0)
        lrAlg.optimizer.setNumIterations(12000000)
        lrAlg.optimizer.setStepSize(0.01)

        val model = lrAlg.run(train)        


        model.clearThreshold()
        val scoreAndLabels = test.map { point =>
              val score = model.predict(point.features)
              (score, point.label)
        }

        val metrics = new BinaryClassificationMetrics(scoreAndLabels)
        val auROC = metrics.areaUnderROC()
        println("Area under ROC = " + auROC)
     }
}

1 回答

  • 1

    我通过运行修复了速度问题

    train = train.coalesce(1) train.cache()

    并通过增加内存总计64演出 . 由于没有足够的RAM,以前Spark可能没有正确缓存 .

相关问题