我正在使用spark(通过Scala API)实现LDA模型,并使用不同数量的主题测试模型 . 它似乎一般工作正常,但遇到间歇性任务失败,我很确定与内存问题有关 . 我当前代码的相关部分如下 .

请注意,我正在从RDD的文本转储中加载我的数据,其中每个文档都是稀疏的mllib向量 . 所以我的 LDA_vectors 文件中的示例行如下所示:

(7066346,(112312,[1,3,5,7,...],[2.0,57.0,10.0,2.0,...]))

这是标准的mllib稀疏格式,可以读作

(document_id,(vocabulary_size,[term_id_1, term_id_2...], [term_1_freq,term_2, freq,...]))

所以 parse 函数处理将其读入RDD:

import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import org.apache.spark.rdd.RDD
import java.io._

def parse(rdd: RDD[String]): RDD[(Long, Vector)] = {
  val pattern: scala.util.matching.Regex = "\\(([0-9]+),(.*)\\)".r
  rdd .map{
  case pattern(k, v) => (k.toLong, Vectors.parse(v))
  }
 }

val text = sc.textFile("/path/to/LDA_vectors")
val docsWithFeatures = parse(text).repartition(192).cache()

然后我对不同数量的主题进行循环 . 请注意,用于将word-document矩阵保存到文件的代码块遵循here描述的方法:

for (n_topics <- Range(10,301,5) ){

  val s = n_topics.toString
  val lda = new LDA().setK(n_topics).setMaxIterations(20) 
  val ldaModel = lda.run(docsWithFeatures)
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // write some model info to file
  val outfile = new File(s"model_summary_$s")
  @transient val bw = new BufferedWriter(new FileWriter(outfile))
  bw.write("topicConcentration:"+"\t"+distLDAModel.topicConcentration+"\n")
  bw.write("docConcentration:"+"\t"+distLDAModel.docConcentration(0)+"\n")
  bw.write("LL:"+"\t"+distLDAModel.logLikelihood+"\n")
  bw.close()

  // convert to Distributed model so we can get our topic data out 
  val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]

  // Save the document-topic matrix to file
  distLDAModel.topicDistributions.saveAsTextFile(s"doc_topic_$s")

  // this saves the word-topic matrix to file
  val topic_mat = distLDAModel.topicsMatrix
  val localMatrix: List[Array[Double]] = topic_mat.transpose.toArray.grouped(topic_mat.numCols).toList
  val lines: List[String] = localMatrix.map(line => line.mkString(" "))
  val outfile2 = new File(s"artist_topic_$s")
  @transient val bw2 = new BufferedWriter(new FileWriter(outfile2))
  for (line <- lines) bw2.write(line+"\n")
  bw2.close()
}

好 . 所以这一切都运行正常,但正如我所说,我开始遇到任务失败,我越来越有可能增加主题的数量 . 我认为这些是由内存问题引起的,这让我想知道如何在火花中调整LDA .

我在Google Cloud Dataproc上运行,因此我的资源很灵活,但我意识到我不了解Spark的LDA的内部结构,足以知道如何最好地优化性能 .

到目前为止,我的一次尝试是我在这一行中所做的:

val docsWithFeatures = parse(text).repartition(192).cache()

在这里,我将文档RDD重新分区为192个分区(对于此示例,我在48个核心上运行spark,因此使用了4 * n_cores的经验法则)并对其进行缓存 . 如果我在RDD上重复 Map ,这是合理的,但我不确定它是否/如何在这里有助于提高性能 . 我还能在这做什么?

为了便于任何回答,这里有一些关于我的语料库的摘要统计数据:

  • 文件:166,784

  • 词汇量(唯一术语数):112,312

  • 总代币:4,430,237,213

也许我的大量令牌是这里的主要问题,我只需要增加每个任务的内存,即增加可用内存并减少执行程序的数量 . 但当然这取决于Spark LDA在引擎盖下的工作方式 . 例如,参见我之前的问题here .