首页 文章

尝试使用scala从Spark向mongodb写入$ group聚合时出现重复键错误

提问于
浏览
2

Edit: This edit may change the course of this issue.

在spark上运行mongodb聚合(特别是$ group)会在写回集合时创建重复的_id记录 . 结果,mongodb抛出重复键错误 . 顺便说一下,这个查询在mongo shell中运行得非常好 .

这是我做的:

我拿了一个小数据集,然后将(聚合)spark代码的结果打印到控制台,而不是写入集合 . 我打印完整的结果集,我在_id字段中找到了重复项 . 数据看起来像这样:(已编辑)

Document{{_id=Document{{prodCategory=123},{proId=ABC},{‌​location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}

Document{{_id=Document{{prodCategory=123},{proId=ABC},{locat‌​ion=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.

有许多这样的重复文件 . 我不明白的是,为什么火花没有在将它写入集合之前整合完整的(map ??)工作?每个分区只是映射记录并直接将其写入集合 . 这不是它应该如何正常工作?

如果有专家建议如何解决此问题,或者您在下面的 original post 中看到了我的代码中应该更改的任何内容 . 请指教 .

original Post:

我有以下收藏 .

prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}

我的聚合列出了组的所有客户和日期,其中状态为“完成” . 以下是mongodb代码 .

db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project: 
    {
        prodId:1,
        location:1,
        customer:1,
        status:1,
        eventTime:1,
        prodCategory:1
    }}
, {$group: 
    {
        _id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
        details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
        count: {$sum: 1}
    }}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)

当我直接在mongodb中运行它时,它运行完美,没有问题并将所有数据保存到prodAgg集合中 . 聚合集合看起来像这样(编辑数据):

{
    "_id" : {
        "prodCategory" : "category1",
        "prodId" : "prod1",
        "location" : "US-EAST"
    },
    "details" : [ 
        {
            "customer" : "xxx@yyy.com",
            "date" : "07/15/2016",
            "time" : "14:00:48"
        }, 
        {
            "customer" : "aaa@bbb.com",
            "date" : "07/15/2016",
            "time" : "19:05:48"
        }, 
        {
            "customer" : "ccc@ddd.com",
            "date" : "07/15/2016",
            "time" : "17:55:48"
        }, 
        {
            "customer" : "eee@fff.com",
            "date" : "07/15/2016",
            "time" : "19:20:49"
        }
    ],
    "count" : 4.0
}

问题是,如果我从spark执行此操作尝试将其写入集合 . 它写了一些文档然后失败,出现以下异常(编辑数据):

com.mongodb.MongoBulkWriteException:服务器192.168.56.1:27017上的批量写入操作错误 . 写入错误:[BulkWriteError {index = 6,code = 11000,message ='E11000重复键错误集合:dbname.prodAgg索引:id dup key:{:{prodCategory:“xxx”,prodId:“yyyyy”,location:“ US-EAST“}}',details = {}}] . 在com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)

这个错误在过去的3天一直困扰着我,我无法超越这个 .

我的理解是(我可能是错的但是),组聚合本身不应该有任何重复,那么如何/为什么它会抛出重复键错误 . 或者我在聚合中做错了什么?还是scala代码?

如果以前有任何灵魂出现过这种情况,请取出一些亮光并将我从这个漩涡中拉出来 . 我真的很感激

这是我的scala代码 . 我正在使用

mongodb-spark-connector

import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._

val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
      set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
        .set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")

      val sc = new SparkContext(conf)
      val rdd = sc.loadFromMongoDB()

      val aggRdd = rdd.withPipeline(Seq(
      Document.parse("{$match:{status:'end'}"),
      Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
      Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
      Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
    println(aggRdd.count)

// Using the write Config to Write to DB
     val writeConf = WriteConfig(sc)
     val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
     MongoSpark.save(aggRdd, writeConfig)

我的SBT档案:

name := "Simple Project"
    version := "1.0"

    scalaVersion := "2.11.7"

    libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"

    //libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"

    libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"

    libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"

    libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"

    resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
    resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
    resolvers += "releases"  at "https://oss.sonatype.org/content/repositories/releases/"

注意:不使用最新版本spark的原因是,在最新版本中它引发了另一个异常:

线程中的异常“dag-scheduler-event-loop”java.lang.NoClassDefFoundError:org / apache / spark / sql / DataFrame

对于我的生活,我无法理解这是什么,我甚至不使用数据帧 . 所以...我会把它留在那......如果有人对此有任何建议,我会很乐意接受它 .

任何建议都非常感谢...谢谢 .

EDIT:

这是scala代码运行时的mongo日志 . 这是失败前的最后一块( edited

command dbname.ProdTransaction命令:aggregate {aggregate:“ProdTransaction”,pipeline:[{$ match:{_ id:{$ gte:ObjectId('554c949ae4b0d28d51194caf'),$ lt:ObjectId('55be257be4b0c3bd1c74e202')}}},{ $ match:{$ and:[{status:“end”},{location:“US”},{prodId:{$ nin:[“abc”,“xxx”,“yyy”]}}]}}, {$ project:{prodId:1,location:1,customer:1,status:1,eventTime:1,prodCategory:1}},{$ group:{_ id:{lab:“$ prodId”,location:“$ location“},details:{$ addToSet:{prodCategory:”$ prodCategory“,user:”$ customer“,date:”$ eventTime“}},count:{$ sum:1}}},{$ sort:{ count:-1}}] cursor:{}} cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 locks:{Global:{acquireCount:{r:2694}},Database:{acquireCount:{r: 1347}},Collection:{acquireCount:{r:1347}}} protocol:op_query 1155ms

4 回答

  • 1

    我可能会弄错,但由于你的聚合返回一个带有键 _id 的对象,mongo会在插入时尝试使用它作为文档的ID . 不确定这是否是您想要的结果...如果没有,只需将 _id 键更改为其他内容( id 即使可行)

  • 0

    这里有很多回答,所以我把我的答案分成了三个部分;配置,为什么会发生以及如何解决它:

    配置问题

    线程中的异常“dag-scheduler-event-loop”java.lang.NoClassDefFoundError:org / apache / spark / sql / DataFrame

    当错误版本的MongoDB Spark Connector与Spark一起使用时会发生 . 如果你想使用Spark 2.0.x,你需要2.0.x MongoDB Spark Connector,例如:

    libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.0.0"
    

    (如果您的Scala版本不同步,也会发生这些误导性错误,例如使用Scala 2.10和编译为2.11的库)

    关于安装,没有必要for: libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1" . 它是一个独立的库,有自己的MongoClient,Codecs等,因此在连接器旁边使用时可能会导致错误 .

    为什么您可以期望使用$ group聚合重复_ids

    Spark的工作原理是对数据集合进行分区,然后在Spark工作节点上对数据处理进行并行处理 .

    Mongo Spark Connector中有各种分区程序,它们都默认在Document的 _id 键上对集合进行分区 . 然后在每个分区上执行聚合 .

    因此,当您在集合的分区上运行多个聚合时,您可以合理地期望在生成的RDD中生成重复的_id .

    修复重复的_ids

    我可以想到这个问题的三个可能的解决方案:

    A)鉴于聚合管道的性质,您应该使用$out运算符 . 这更有益,因为数据保持在MongoDB本地,您不需要维护Spark集群 .

    B)基于Spark的替代方案是在RDD上进行进一步处理,以便在保存回MongoDB之前合并任何重复的_id .

    C)理论上,您可以提供自己的分区程序,该分区程序根据分组的 _id 字段返回分区 . 实际上,我不能想到一个好的方法来做到这一点,而不需要使用 $in 过滤器的分区查询,这个过滤器效率不高 .

  • 0

    我一直在调查MongoDB的问题,以下是我的一些想法:

    • 我注意到你的聚合查询中有一个错误 - 在“ {$match:{status:'end'} " intentional ? It should ends up with a double brace - " {$match:{status:'end'}} ”中缺少大括号 . 我在我的错误再现代码中更改了它 .

    • IMO您的聚合查询不会生成重复的密钥 - 一旦您运行Spark应用程序,就会创建prodAgg集合并使用聚合结果填充,然后如果您第二次运行它将生成相同的密钥,但旧的集合将不会首先被删除 . 这是您遇到问题的原因 .

    在火花聚合之前添加以下行:

    val mongoClient = new MongoClient() 
    val db = mongoClient.getDatabase("dbname")
    db.getCollection("prodAgg").drop()
    

    如果代码中没有$ group聚合,则聚合有效 . 只需$ match,$ project,$ sort ..任何命令组合都可以 . 一旦我把$ group,即使有一个参数,它也会失败 .

    我无法重现这种行为,但如果我将文件的_id排除在$ project运算符之外,它就像你说的那样工作 . 在这种情况下,我有一个解释 - 当没有源文档的_id时,聚合会创建一个带有新_id的新文档,然后您就没有重复 . 如果不在投影中排除_id,则应该从源文档继承,然后应该进行重复 .

  • 0

    我做了一些追踪和错误并缩小了问题(在Sagar Reddy的建议的帮助下) . 这是我发现的 .

    如果代码中没有 $group 聚合,则聚合有效 . 只是 $match, $project, $sort ..任何命令组合都可以正常工作 . 一旦我把 $group ,即使有一个参数,它也会失败 .

    我认为它的原因是, $group 是将 new _id 添加到集合中的唯一聚合 . 没有其他聚合命令会添加 "new" _id . 这就是问题所在,这就是问题所在 .

    我的问题是我需要 $group 用于我的聚合,如果没有它我就没有帮助 .

    如果您有任何人对此有解决方案/信息或解决方法,请告知 .

    非常感谢 .

相关问题