Not a duplicate of this because I'm asking about what the input is, not what function to call, see below
我跟着this guide在Spark 1.5中创建了一个LDA模型 . 我在this question中看到要获取新文档的主题分布,我需要使用LocalLDAModel的topicDistributions函数,该函数采用RDD [(Long,Vector)] .
Should the new document vector be a term-count vector? 这是LDA训练过的矢量类型 . 我的代码编译并运行但我想知道这是否是topicDistributions函数的预期用途
import org.apache.spark.rdd._
import org.apache.spark.mllib.clustering.{LDA, DistributedLDAModel, LocalLDAModel}
import org.apache.spark.mllib.linalg.{Vector, Vectors}
import scala.collection.mutable
val input = Seq("this is a document","this could be another document","these are training, not tests", "here is the final file (document)")
val corpus: RDD[Array[String]] = sc.parallelize(input.map{
doc => doc.split("\\s")
})
val termCounts: Array[(String, Long)] = corpus.flatMap(_.map(_ -> 1L)).reduceByKey(_ + _).collect().sortBy(-_._2)
val vocabArray: Array[String] = termCounts.takeRight(termCounts.size).map(_._1)
val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
// Convert documents into term count vectors
val documents: RDD[(Long, Vector)] =
corpus.zipWithIndex.map { case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
}
}
(id, Vectors.sparse(vocab.size, counts.toSeq))
}
// Set LDA parameters
val numTopics = 10
val ldaModel: DistributedLDAModel = new LDA().setK(numTopics).setMaxIterations(20).run(documents).asInstanceOf[DistributedLDAModel]
//create test input, convert to term count, and get its topic distribution
val test_input = Seq("this is my test document")
val test_document:RDD[(Long,Vector)] = sc.parallelize(test_input.map(doc=>doc.split("\\s"))).zipWithIndex.map{ case (tokens, id) =>
val counts = new mutable.HashMap[Int, Double]()
tokens.foreach { term =>
if (vocab.contains(term)) {
val idx = vocab(term)
counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
}
}
(id, Vectors.sparse(vocab.size, counts.toSeq))
}
println("test_document: "+test_document.first._2.toArray.mkString(", "))
val localLDAModel: LocalLDAModel = ldaModel.toLocal
val topicDistributions = localLDAModel.topicDistributions(documents)
println("first topic distribution:"+topicDistributions.first._2.toArray.mkString(", "))
1 回答
根据Spark src,我注意到有关文档参数的以下注释:
所以答案是肯定的,新的文档向量应该是术语计数向量 . 此外,矢量排序应与训练中使用的相同 .