首页 文章

Apache Spark中的CPU使用率是否有限?

提问于
浏览
1

我最近发现,即使在 local[1] 模式下运行spark或使用带有1个 Actuator 和1个核心的Yarn,在UDF中添加并行计算(例如使用并行集合)也会提高性能 .

例如 . 在 local[1] 模式下,Spark-Jobs消耗尽可能多的CPU(即,如果我有8个内核,使用 top 测量,则为800%) .

这看起来很奇怪,因为我认为Spark(或纱线)限制了每个Spark应用程序的CPU使用率?

所以我想知道为什么会这样,是否建议在spark中使用并行处理/多线程或者我应该坚持使用并行化模式的火花?

这里有一个例子(在纱线客户端模式下用1个实例和1个核心测量的时间)

case class MyRow(id:Int,data:Seq[Double])

// create dataFrame
val rows = 10
val points = 10000
import scala.util.Random.nextDouble
val data = {1 to rows}.map{i => MyRow(i, Stream.continually(nextDouble()).take(points))}
val df = sc.parallelize(data).toDF().repartition($"id").cache()

df.show() // trigger computation and caching

// some expensive dummy-computation for each array-element
val expensive = (d:Double) => (1 to 10000).foldLeft(0.0){case(a,b) => a*b}*d

val serialUDF = udf((in:Seq[Double]) => in.map{expensive}.sum)
val parallelUDF = udf((in:Seq[Double]) => in.par.map{expensive}.sum)

df.withColumn("sum",serialUDF($"data")).show() // takes ~ 10 seconds
df.withColumn("sum",parallelUDF($"data")).show() // takes ~ 2.5 seconds

2 回答

  • 1

    Spark不直接限制CPU,而是定义spark创建的并发线程数 . 因此对于local [1],它基本上可以并行运行一个任务 . 当您执行in.par.map 时,您正在创建spark无法管理的线程,因此不受此限制的处理 . 即你告诉spark将自己限制在一个线程,然后创建其他线程而不知道它 .

    通常,在spark操作中执行并行线程并不是一个好主意 . 相反,最好告诉spark它可以使用多少个线程,并确保有足够的并行分区 .

  • 0

    Spark是CPU使用率的配置

    val conf = new SparkConf()
                 .setMaster("local[2]")
                 .setAppName("CountingSheep")
    val sc = new SparkContext(conf)
    

    更改本地[*]它将利用所有CPU核心 .

相关问题