Edit: This edit may change the course of this issue.
在spark上运行mongodb聚合(特别是$ group)会在写回集合时创建重复的_id记录 . 结果,mongodb抛出重复键错误 . 顺便说一下,这个查询在mongo shell中运行得非常好 .
这是我做的:
我拿了一个小数据集,然后将(聚合)spark代码的结果打印到控制台,而不是写入集合 . 我打印完整的结果集,我在_id字段中找到了重复项 . 数据看起来像这样:(已编辑)
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=2223}}
Document{{_id=Document{{prodCategory=123},{proId=ABC},{location=US}}, details=[Document{{....}},Document{{....}},Document{{...}}, count=123}}.
有许多这样的重复文件 . 我不明白的是,为什么火花没有在将它写入集合之前整合完整的(map ??)工作?每个分区只是映射记录并直接将其写入集合 . 这不是它应该如何正常工作?
如果有专家建议如何解决此问题,或者您在下面的 original post 中看到了我的代码中应该更改的任何内容 . 请指教 .
original Post:
我有以下收藏 .
prodTransactions:
{
_id:
ProdCategory:
product:
location:
customer:
eventTime:
status:
}
我的聚合列出了组的所有客户和日期,其中状态为“完成” . 以下是mongodb代码 .
db.prodTransactions.aggregate([
{$match: {status:'complete'}
, {$project:
{
prodId:1,
location:1,
customer:1,
status:1,
eventTime:1,
prodCategory:1
}}
, {$group:
{
_id: {prodCategory: "$prodCategory", lab: "$prodId", location: "$location"},
details: {$addToSet: {customer: "$customer", date: {$dateToString: {format: "%m/%d/%Y", date: "$eventTime"}}, time: {$dateToString: {format: "%H:%M:%S", date: "$eventTime"}}}},
count: {$sum: 1}
}}
, {$out : "prodAgg"}
],{allowDiskUse: true}
)
当我直接在mongodb中运行它时,它运行完美,没有问题并将所有数据保存到prodAgg集合中 . 聚合集合看起来像这样(编辑数据):
{
"_id" : {
"prodCategory" : "category1",
"prodId" : "prod1",
"location" : "US-EAST"
},
"details" : [
{
"customer" : "xxx@yyy.com",
"date" : "07/15/2016",
"time" : "14:00:48"
},
{
"customer" : "aaa@bbb.com",
"date" : "07/15/2016",
"time" : "19:05:48"
},
{
"customer" : "ccc@ddd.com",
"date" : "07/15/2016",
"time" : "17:55:48"
},
{
"customer" : "eee@fff.com",
"date" : "07/15/2016",
"time" : "19:20:49"
}
],
"count" : 4.0
}
问题是,如果我从spark执行此操作尝试将其写入集合 . 它写了一些文档然后失败,出现以下异常(编辑数据):
com.mongodb.MongoBulkWriteException:服务器192.168.56.1:27017上的批量写入操作错误 . 写入错误:[BulkWriteError {index = 6,code = 11000,message ='E11000重复键错误集合:dbname.prodAgg索引:id dup key:{:{prodCategory:“xxx”,prodId:“yyyyy”,location:“ US-EAST“}}',details = {}}] . 在com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
这个错误在过去的3天一直困扰着我,我无法超越这个 .
我的理解是(我可能是错的但是),组聚合本身不应该有任何重复,那么如何/为什么它会抛出重复键错误 . 或者我在聚合中做错了什么?还是scala代码?
如果以前有任何灵魂出现过这种情况,请取出一些亮光并将我从这个漩涡中拉出来 . 我真的很感激
这是我的scala代码 . 我正在使用
mongodb-spark-connector
import org.apache.spark.{SparkConf, SparkContext}
import com.mongodb.spark._
import org.bson.Document
import com.mongodb.spark.config._
val conf = new SparkConf().setAppName("ProdAnalytics1").setMaster("local").
set("spark.mongodb.input.uri","mongodb://192.168.56.1:27017/dbname.prodTransactions")
.set("spark.mongodb.output.uri", "mongodb://192.168.56.1:27017/dbname.prodAgg")
val sc = new SparkContext(conf)
val rdd = sc.loadFromMongoDB()
val aggRdd = rdd.withPipeline(Seq(
Document.parse("{$match:{status:'end'}"),
Document.parse("{$project: {prodId:1,location:1,customer:1,type:1,eventTime:1,prodCategory:1}}"),
Document.parse("{$group: {_id: {prodCategory: \"$prodCategory\", prodId: \"$prodId\", location: \"$location\"},details: {$addToSet: {customer: \"$customer\", date: \"$eventTime\"}},count: {$sum: 1}}}"),
Document.parse("{$sort: {count : -1}}, {allowDiskUse: true}")))
println(aggRdd.count)
// Using the write Config to Write to DB
val writeConf = WriteConfig(sc)
val writeConfig = WriteConfig(Map("collection" -> "prodAgg", "db" -> "dbname"), Some(writeConf))
MongoSpark.save(aggRdd, writeConfig)
我的SBT档案:
name := "Simple Project"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1"
//libraryDependencies += "org.apache.spark" %% "spark-mllib" % "1.6.1"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.1.0"
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
resolvers += "snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/ "
resolvers += "releases" at "https://oss.sonatype.org/content/repositories/releases/"
注意:不使用最新版本spark的原因是,在最新版本中它引发了另一个异常:
线程中的异常“dag-scheduler-event-loop”java.lang.NoClassDefFoundError:org / apache / spark / sql / DataFrame
对于我的生活,我无法理解这是什么,我甚至不使用数据帧 . 所以...我会把它留在那......如果有人对此有任何建议,我会很乐意接受它 .
任何建议都非常感谢...谢谢 .
EDIT:
这是scala代码运行时的mongo日志 . 这是失败前的最后一块( edited )
command dbname.ProdTransaction命令:aggregate {aggregate:“ProdTransaction”,pipeline:[{$ match:{_ id:{$ gte:ObjectId('554c949ae4b0d28d51194caf'),$ lt:ObjectId('55be257be4b0c3bd1c74e202')}}},{ $ match:{$ and:[{status:“end”},{location:“US”},{prodId:{$ nin:[“abc”,“xxx”,“yyy”]}}]}}, {$ project:{prodId:1,location:1,customer:1,status:1,eventTime:1,prodCategory:1}},{$ group:{_ id:{lab:“$ prodId”,location:“$ location“},details:{$ addToSet:{prodCategory:”$ prodCategory“,user:”$ customer“,date:”$ eventTime“}},count:{$ sum:1}}},{$ sort:{ count:-1}}] cursor:{}} cursorid:258455518867 keyUpdates:0 writeConflicts:0 numYields:1335 reslen:4092243 locks:{Global:{acquireCount:{r:2694}},Database:{acquireCount:{r: 1347}},Collection:{acquireCount:{r:1347}}} protocol:op_query 1155ms
4 回答
我可能会弄错,但由于你的聚合返回一个带有键
_id
的对象,mongo会在插入时尝试使用它作为文档的ID . 不确定这是否是您想要的结果...如果没有,只需将_id
键更改为其他内容(id
即使可行)这里有很多回答,所以我把我的答案分成了三个部分;配置,为什么会发生以及如何解决它:
配置问题
当错误版本的MongoDB Spark Connector与Spark一起使用时会发生 . 如果你想使用Spark 2.0.x,你需要2.0.x MongoDB Spark Connector,例如:
(如果您的Scala版本不同步,也会发生这些误导性错误,例如使用Scala 2.10和编译为2.11的库)
关于安装,没有必要for:
libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "1.2.1"
. 它是一个独立的库,有自己的MongoClient,Codecs等,因此在连接器旁边使用时可能会导致错误 .为什么您可以期望使用$ group聚合重复_ids
Spark的工作原理是对数据集合进行分区,然后在Spark工作节点上对数据处理进行并行处理 .
Mongo Spark Connector中有各种分区程序,它们都默认在Document的
_id
键上对集合进行分区 . 然后在每个分区上执行聚合 .因此,当您在集合的分区上运行多个聚合时,您可以合理地期望在生成的RDD中生成重复的_id .
修复重复的_ids
我可以想到这个问题的三个可能的解决方案:
A)鉴于聚合管道的性质,您应该使用$out运算符 . 这更有益,因为数据保持在MongoDB本地,您不需要维护Spark集群 .
B)基于Spark的替代方案是在RDD上进行进一步处理,以便在保存回MongoDB之前合并任何重复的_id .
C)理论上,您可以提供自己的分区程序,该分区程序根据分组的
_id
字段返回分区 . 实际上,我不能想到一个好的方法来做到这一点,而不需要使用$in
过滤器的分区查询,这个过滤器效率不高 .我一直在调查MongoDB的问题,以下是我的一些想法:
我注意到你的聚合查询中有一个错误 - 在“
{$match:{status:'end'}
" intentional ? It should ends up with a double brace - "{$match:{status:'end'}}
”中缺少大括号 . 我在我的错误再现代码中更改了它 .IMO您的聚合查询不会生成重复的密钥 - 一旦您运行Spark应用程序,就会创建prodAgg集合并使用聚合结果填充,然后如果您第二次运行它将生成相同的密钥,但旧的集合将不会首先被删除 . 这是您遇到问题的原因 .
在火花聚合之前添加以下行:
我无法重现这种行为,但如果我将文件的_id排除在$ project运算符之外,它就像你说的那样工作 . 在这种情况下,我有一个解释 - 当没有源文档的_id时,聚合会创建一个带有新_id的新文档,然后您就没有重复 . 如果不在投影中排除_id,则应该从源文档继承,然后应该进行重复 .
我做了一些追踪和错误并缩小了问题(在Sagar Reddy的建议的帮助下) . 这是我发现的 .
如果代码中没有
$group
聚合,则聚合有效 . 只是$match, $project, $sort
..任何命令组合都可以正常工作 . 一旦我把$group
,即使有一个参数,它也会失败 .我认为它的原因是,
$group
是将 new _id 添加到集合中的唯一聚合 . 没有其他聚合命令会添加 "new" _id . 这就是问题所在,这就是问题所在 .我的问题是我需要
$group
用于我的聚合,如果没有它我就没有帮助 .如果您有任何人对此有解决方案/信息或解决方法,请告知 .
非常感谢 .