首页 文章

使用Spark,LDA预测新文档的准确性

提问于
浏览
0

我和Spark的Mllib合作,现在正在与LDA合作 .

但是当我使用Spark提供的代码(参见下文)来预测用于训练模型的Doc时,预测的结果(文档主题)与受过训练的文档主题的结果相反 .

我不知道结果是什么造成的 .

请求帮助,这是我的代码如下:

火车: $lda.run(corpus) 语料库是这样的RDD: $RDD[(Long, Vector)] Vector包含词汇,单词索引,wordcounts . 预测:

def predict(documents: RDD[(Long, Vector)], ldaModel: LDAModel):        Array[(Long, Vector)] = {
    var docTopicsWeight = new Array[(Long, Vector)](documents.collect().length)
    ldaModel match {
      case localModel: LocalLDAModel =>
        docTopicsWeight = localModel.topicDistributions(documents).collect()
      case distModel: DistributedLDAModel =>
        docTopicsWeight = distModel.toLocal.topicDistributions(documents).collect()
    }
    docTopicsWeight
  }

2 回答

  • 0

    我直接在模型上使用案例类 . 你需要使用 isInstanceOfasInstanceOf 方法 .

    def predict(documents: RDD[(Long, org.apache.spark.mllib.linalg.Vector)], ldaModel: LDAModel): Array[(Long, org.apache.spark.mllib.linalg.Vector)] = {
    
        var docTopicsWeight = new Array[(Long, org.apache.spark.mllib.linalg.Vector)](documents.collect().length)
        if (ldaModel.isInstanceOf[LocalLDAModel]) {
          docTopicsWeight = ldaModel.asInstanceOf[LocalLDAModel].topicDistributions(documents).collect
        } else if (ldaModel.isInstanceOf[DistributedLDAModel]) {
          docTopicsWeight = ldaModel.asInstanceOf[DistributedLDAModel].toLocal.topicDistributions(documents).collect
        }
        docTopicsWeight
    
    }
    
  • 1

    即使这似乎是一个老帖子,我最近发现自己有同样的问题 . 我想我理解你报道的问题 .

    如果您尝试对每个文档和2个主题只有一个单词的2个小文档进行非常基本的测试,使用EM进行培训,然后从DistributedLDAModel获取topicDistributions,使用正确的alpha和beta,您可以获得模型来推断属于每个文档的每个文档主题即文档1 - 主题1,文档2 - 主题2,在我的情况下,我使每个文档的每个文档的概率为0.998 .

    运行相同的测试,但这次将DistributedLDAModel转换为LocalLDAModel,每个文档属于一个主题的概率降低到0.666(使用相同的alpha和beta以及主题数) .

    所以我接下来要做的是重载.toLocal方法来接受一个新的alpha和beta,然后用这些方法来使它更接近第一个测试但是我有更多的场景要覆盖,每次我都要修改alpha参数 .

    我们团队的结论是,尝试使用DistributedLDAModel转换为LocalLDAModel进行预测似乎并不合适 . https://github.com/rabarona/spark-shell-utils/tree/master/2.1.0/spark-mllib/DistributedLDAModel-to-LocalLDAModel

    你的结论是什么?你找到了解决方案吗?

    钯 . 这就是我发现在小例子上运行测试的内容,如果我遗漏了某些内容或者我说错了,请告诉我 .

    代码示例:

    import org.apache.log4j.{Level, Logger}
    import org.apache.spark.mllib.linalg.{Matrix, Vector, Vectors}
    import org.apache.spark.rdd.RDD
    import org.apache.spark.mllib.clustering._
    
    import scala.collection.mutable
    
    // Turn off warning messages:
    Logger.getLogger("org").setLevel(Level.ERROR)
    
    // Set number of topics
    val numTopics: Int = 2
    
    // Create corpus
    val data: RDD[(String, String, Int)] = spark.sparkContext.parallelize(Seq(("cat fancy", "cat", 1),("dog world", "dog", 1)))
    val corpus: RDD[Array[String]]  = data.map({case (title: String, words: String, count: Int) => Array(words)})
    val corpusSize: Long = corpus.count
    val termCounts: Array[(String, Long)] = corpus.flatMap(_.map(_ -> 1L)).reduceByKey(_+_).collect.sortBy(-_._2)
    val vocabArray: Array[String] = termCounts.takeRight(termCounts.size).map(_._1)
    val vocab: Map[String, Int] = vocabArray.zipWithIndex.toMap
    val documents: RDD[(Long, Vector)] =
        corpus.zipWithIndex.map { case (tokens, id) =>
            val counts = new mutable.HashMap[Int, Double]()
            tokens.foreach { term =>
                if (vocab.contains(term)) {
                    val idx = vocab(term)
                    counts(idx) = counts.getOrElse(idx, 0.0) + 1.0
                }
            }
            (id, Vectors.sparse(vocab.size, counts.toSeq))
        }
    
    /*
    Corpus
    (0,(2,[1],[1.0]))
    (1,(2,[0],[1.0]))
    */
    
    // Set up EM LDA Optimizer
    val emOptimizer: EMLDAOptimizer = new EMLDAOptimizer
    
    // Set up Online LDA Optimizer
    val onlineOptimizer: OnlineLDAOptimizer = new OnlineLDAOptimizer()
    .setOptimizeDocConcentration(true)
    .setMiniBatchFraction({
            val corpusSize = corpus.count()
            if (corpusSize < 2) 0.75
            else (0.05 + 1) / corpusSize
          })
    
    // Run LDA using EM LDA Optimizer and get as instance of Distributed LDA Model
    val distributedModel: DistributedLDAModel = new LDA().setK(numTopics).setMaxIterations(20).setAlpha(1.002).setBeta(1.001).setOptimizer(emOptimizer).run(documents).asInstanceOf[DistributedLDAModel]
    
    distributedModel.topicsMatrix.toString(2, Int.MaxValue).split("\n")(0)
    distributedModel.topicsMatrix.toString(2, Int.MaxValue).split("\n")(1)
    
    println("***** Distributed LDA Model topic distributions *****")
    distributedModel.topicDistributions.collect.foreach(println)
    
    // Run LDA using Online LDA Optimizer and get as instance of Local LDA Model
    val localModel: LocalLDAModel = new LDA().setK(numTopics).setMaxIterations(100).setAlpha(0.0009).setBeta(0.00001).setOptimizer(onlineOptimizer).run(documents).asInstanceOf[LocalLDAModel]
    
    println("***** Local LDA Model topic distributions *****")
    localModel.topicDistributions(documents).collect.foreach(println)
    /*
    documentid, topicDistributions
    (0,[0.999997999996,2.0000040000157828E-6])
    (1,[2.000004000015782E-6,0.999997999996])
    */
    
    
    // Convert Distributed LDA Model to Local LDA Model
    val convertedModel: LocalLDAModel = distributedModel.toLocal
    
    println("***** Local LDA Model from Distributed LDA Model topic distributions *****")
    println("Performance is affected due to document concentration still same as used for EM")
    convertedModel.topicDistributions(documents).collect.foreach(println)
    

相关问题