首页 文章

MongoDB:将来自多个集合的数据合并为一个......如何?

提问于
浏览
187

我如何(在MongoDB中)将来自多个集合的数据合并到一个集合中?

我可以使用map-reduce吗?如果是,那么如何?

我非常感谢一些例子,因为我是新手 .

9 回答

  • 98

    虽然您无法实时执行此操作,但可以使用MongoDB 1.8 map / reduce中的"reduce" out选项多次运行map-reduce以将数据合并在一起(请参阅http://www.mongodb.org/display/DOCS/MapReduce#MapReduce-Outputoptions) . 您需要在两个集合中都有一些密钥才能用作_id .

    例如,假设您有一个 users 集合和一个 comments 集合,并且您希望拥有一个新集合,其中包含每个评论的一些用户人口统计信息 .

    假设 users 集合包含以下字段:

    • _id

    • firstName

    • lastName

    • 国家

    • 性别

    • 年龄

    然后 comments 集合包含以下字段:

    • _id

    • userId

    • 评论

    • 已创建

    你会做这个map / reduce:

    var mapUsers, mapComments, reduce;
    db.users_comments.remove();
    
    // setup sample data - wouldn't actually use this in production
    db.users.remove();
    db.comments.remove();
    db.users.save({firstName:"Rich",lastName:"S",gender:"M",country:"CA",age:"18"});
    db.users.save({firstName:"Rob",lastName:"M",gender:"M",country:"US",age:"25"});
    db.users.save({firstName:"Sarah",lastName:"T",gender:"F",country:"US",age:"13"});
    var users = db.users.find();
    db.comments.save({userId: users[0]._id, "comment": "Hey, what's up?", created: new ISODate()});
    db.comments.save({userId: users[1]._id, "comment": "Not much", created: new ISODate()});
    db.comments.save({userId: users[0]._id, "comment": "Cool", created: new ISODate()});
    // end sample data setup
    
    mapUsers = function() {
        var values = {
            country: this.country,
            gender: this.gender,
            age: this.age
        };
        emit(this._id, values);
    };
    mapComments = function() {
        var values = {
            commentId: this._id,
            comment: this.comment,
            created: this.created
        };
        emit(this.userId, values);
    };
    reduce = function(k, values) {
        var result = {}, commentFields = {
            "commentId": '', 
            "comment": '',
            "created": ''
        };
        values.forEach(function(value) {
            var field;
            if ("comment" in value) {
                if (!("comments" in result)) {
                    result.comments = [];
                }
                result.comments.push(value);
            } else if ("comments" in value) {
                if (!("comments" in result)) {
                    result.comments = [];
                }
                result.comments.push.apply(result.comments, value.comments);
            }
            for (field in value) {
                if (value.hasOwnProperty(field) && !(field in commentFields)) {
                    result[field] = value[field];
                }
            }
        });
        return result;
    };
    db.users.mapReduce(mapUsers, reduce, {"out": {"reduce": "users_comments"}});
    db.comments.mapReduce(mapComments, reduce, {"out": {"reduce": "users_comments"}});
    db.users_comments.find().pretty(); // see the resulting collection
    

    此时,您将拥有一个名为 users_comments 的新集合,其中包含合并的数据,您现在可以使用它 . 这些缩小的集合都有 _id ,这是您在 Map 函数中发出的关键,然后所有值都是 value 键中的子对象 - 这些值不在这些缩小文档的顶层 .

    这是一个有点简单的例子 . 您可以使用更多集合重复此操作,以便继续构建简化集合 . 您还可以在此过程中对数据进行摘要和聚合 . 可能您会定义多个reduce函数,因为聚合和保留现有字段的逻辑变得更加复杂 .

    您还会注意到,现在每个用户都有一个文档,其中包含该用户在数组中的所有注释 . 如果我们合并具有一对一关系而不是一对多关系的数据,那么它将是平坦的,您可以简单地使用如下的reduce函数:

    reduce = function(k, values) {
        var result = {};
        values.forEach(function(value) {
            var field;
            for (field in value) {
                if (value.hasOwnProperty(field)) {
                    result[field] = value[field];
                }
            }
        });
        return result;
    };
    

    如果你想展平 users_comments 集合,那么它是每个评论的一个文档,另外运行:

    var map, reduce;
    map = function() {
        var debug = function(value) {
            var field;
            for (field in value) {
                print(field + ": " + value[field]);
            }
        };
        debug(this);
        var that = this;
        if ("comments" in this.value) {
            this.value.comments.forEach(function(value) {
                emit(value.commentId, {
                    userId: that._id,
                    country: that.value.country,
                    age: that.value.age,
                    comment: value.comment,
                    created: value.created,
                });
            });
        }
    };
    reduce = function(k, values) {
        var result = {};
        values.forEach(function(value) {
            var field;
            for (field in value) {
                if (value.hasOwnProperty(field)) {
                    result[field] = value[field];
                }
            }
        });
        return result;
    };
    db.users_comments.mapReduce(map, reduce, {"out": "comments_with_demographics"});
    

    绝对不应该在飞行中执行此技术 . 它's suited for a cron job or something like that which updates the merged data periodically. You' ll可能想要在新集合上运行 ensureIndex 以确保您对它执行的查询运行得很快(请记住,您的数据仍然在 value 键内,所以如果您在注释 created 时间索引 comments_with_demographics ,它会是 db.comments_with_demographics.ensureIndex({"value.created": 1});

  • 11

    MongoDB 3.2现在允许通过$lookup aggregation stage将来自多个集合的数据合并为一个 . 作为一个实际的例子,假设你有关于书籍的数据分成两个不同的集合 .

    第一个集合,名为 books ,具有以下数据:

    {
        "isbn": "978-3-16-148410-0",
        "title": "Some cool book",
        "author": "John Doe"
    }
    {
        "isbn": "978-3-16-148999-9",
        "title": "Another awesome book",
        "author": "Jane Roe"
    }
    

    第二个集合名为 books_selling_data ,具有以下数据:

    {
        "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
        "isbn": "978-3-16-148410-0",
        "copies_sold": 12500
    }
    {
        "_id": ObjectId("56e31ce076cdf52e541d9d28"),
        "isbn": "978-3-16-148999-9",
        "copies_sold": 720050
    }
    {
        "_id": ObjectId("56e31ce076cdf52e541d9d29"),
        "isbn": "978-3-16-148999-9",
        "copies_sold": 1000
    }
    

    合并两个集合只需要通过以下方式使用$ lookup:

    db.books.aggregate([{
        $lookup: {
                from: "books_selling_data",
                localField: "isbn",
                foreignField: "isbn",
                as: "copies_sold"
            }
    }])
    

    在此聚合之后, books 集合将如下所示:

    {
        "isbn": "978-3-16-148410-0",
        "title": "Some cool book",
        "author": "John Doe",
        "copies_sold": [
            {
                "_id": ObjectId("56e31bcf76cdf52e541d9d26"),
                "isbn": "978-3-16-148410-0",
                "copies_sold": 12500
            }
        ]
    }
    {
        "isbn": "978-3-16-148999-9",
        "title": "Another awesome book",
        "author": "Jane Roe",
        "copies_sold": [
            {
                "_id": ObjectId("56e31ce076cdf52e541d9d28"),
                "isbn": "978-3-16-148999-9",
                "copies_sold": 720050
            },
            {
                "_id": ObjectId("56e31ce076cdf52e541d9d28"),
                "isbn": "978-3-16-148999-9",
                "copies_sold": 1000
            }
        ]
    }
    

    重要的是要注意以下几点:

    • "from"集合,在本例中为 books_selling_data ,无法分片 .

    • "as"字段将是一个数组,如上例所示 .

    • $lookup stage上的"localField"和"foreignField"选项如果在各自的集合中不存在,则会被视为空,以便进行匹配($lookup docs有一个完美的例子) .

    因此,作为一个结论,如果你想整合两个集合,在这种情况下,拥有一个平面的copy_sold字段和销售的总副本,你将需要更多的工作,可能使用一个中间集合,然后,是$out到最后的收藏 .

  • 131

    如果mongodb中没有批量插入,我们循环 small_collection 中的所有对象并将它们逐个插入到_745696中:

    db.small_collection.find().forEach(function(obj){ 
       db.big_collection.insert(obj)
    });
    
  • -1

    $ lookup的非常基本的例子 .

    db.getCollection('users').aggregate([
        {
            $lookup: {
                from: "userinfo",
                localField: "userId",
                foreignField: "userId",
                as: "userInfoData"
            }
        },
        {
            $lookup: {
                from: "userrole",
                localField: "userId",
                foreignField: "userId",
                as: "userRoleData"
            }
        },
        { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }},
        { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
    ])
    

    这是用的

    { $unwind: { path: "$userInfoData", preserveNullAndEmptyArrays: true }}, 
     { $unwind: { path: "$userRoleData", preserveNullAndEmptyArrays: true }}
    

    代替

    { $unwind:"$userRoleData"} 
    { $unwind:"$userRoleData"}
    

    因为如果找不到与$ lookup匹配的记录, { $unwind:"$userRoleData"} 将返回空或0结果 .

  • 8

    在聚合中为多个集合使用多个 $lookup

    query:

    db.getCollection('servicelocations').aggregate([
      {
        $match: {
          serviceLocationId: {
            $in: ["36728"]
          }
        }
      },
      {
        $lookup: {
          from: "orders",
          localField: "serviceLocationId",
          foreignField: "serviceLocationId",
          as: "orders"
        }
      },
      {
        $lookup: {
          from: "timewindowtypes",
          localField: "timeWindow.timeWindowTypeId",
          foreignField: "timeWindowTypeId",
          as: "timeWindow"
        }
      },
      {
        $lookup: {
          from: "servicetimetypes",
          localField: "serviceTimeTypeId",
          foreignField: "serviceTimeTypeId",
          as: "serviceTime"
        }
      },
      {
        $unwind: "$orders"
      },
      {
        $unwind: "$serviceTime"
      },
      {
        $limit: 14
      }
    ])
    

    result:

    {
        "_id" : ObjectId("59c3ac4bb7799c90ebb3279b"),
        "serviceLocationId" : "36728",
        "regionId" : 1.0,
        "zoneId" : "DXBZONE1",
        "description" : "AL HALLAB REST EMIRATES MALL",
        "locationPriority" : 1.0,
        "accountTypeId" : 1.0,
        "locationType" : "SERVICELOCATION",
        "location" : {
            "makani" : "",
            "lat" : 25.119035,
            "lng" : 55.198694
        },
        "deliveryDays" : "MTWRFSU",
        "timeWindow" : [ 
            {
                "_id" : ObjectId("59c3b0a3b7799c90ebb32cde"),
                "timeWindowTypeId" : "1",
                "Description" : "MORNING",
                "timeWindow" : {
                    "openTime" : "06:00",
                    "closeTime" : "08:00"
                },
                "accountId" : 1.0
            }, 
            {
                "_id" : ObjectId("59c3b0a3b7799c90ebb32cdf"),
                "timeWindowTypeId" : "1",
                "Description" : "MORNING",
                "timeWindow" : {
                    "openTime" : "09:00",
                    "closeTime" : "10:00"
                },
                "accountId" : 1.0
            }, 
            {
                "_id" : ObjectId("59c3b0a3b7799c90ebb32ce0"),
                "timeWindowTypeId" : "1",
                "Description" : "MORNING",
                "timeWindow" : {
                    "openTime" : "10:30",
                    "closeTime" : "11:30"
                },
                "accountId" : 1.0
            }
        ],
        "address1" : "",
        "address2" : "",
        "phone" : "",
        "city" : "",
        "county" : "",
        "state" : "",
        "country" : "",
        "zipcode" : "",
        "imageUrl" : "",
        "contact" : {
            "name" : "",
            "email" : ""
        },
        "status" : "ACTIVE",
        "createdBy" : "",
        "updatedBy" : "",
        "updateDate" : "",
        "accountId" : 1.0,
        "serviceTimeTypeId" : "1",
        "orders" : [ 
            {
                "_id" : ObjectId("59c3b291f251c77f15790f92"),
                "orderId" : "AQ18O1704264",
                "serviceLocationId" : "36728",
                "orderNo" : "AQ18O1704264",
                "orderDate" : "18-Sep-17",
                "description" : "AQ18O1704264",
                "serviceType" : "Delivery",
                "orderSource" : "Import",
                "takenBy" : "KARIM",
                "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
                "plannedDeliveryTime" : "",
                "actualDeliveryDate" : "",
                "actualDeliveryTime" : "",
                "deliveredBy" : "",
                "size1" : 296.0,
                "size2" : 3573.355,
                "size3" : 240.811,
                "jobPriority" : 1.0,
                "cancelReason" : "",
                "cancelDate" : "",
                "cancelBy" : "",
                "reasonCode" : "",
                "reasonText" : "",
                "status" : "",
                "lineItems" : [ 
                    {
                        "ItemId" : "BNWB020",
                        "size1" : 15.0,
                        "size2" : 78.6,
                        "size3" : 6.0
                    }, 
                    {
                        "ItemId" : "BNWB021",
                        "size1" : 20.0,
                        "size2" : 252.0,
                        "size3" : 11.538
                    }, 
                    {
                        "ItemId" : "BNWB023",
                        "size1" : 15.0,
                        "size2" : 285.0,
                        "size3" : 16.071
                    }, 
                    {
                        "ItemId" : "CPMW112",
                        "size1" : 3.0,
                        "size2" : 25.38,
                        "size3" : 1.731
                    }, 
                    {
                        "ItemId" : "MMGW001",
                        "size1" : 25.0,
                        "size2" : 464.375,
                        "size3" : 46.875
                    }, 
                    {
                        "ItemId" : "MMNB218",
                        "size1" : 50.0,
                        "size2" : 920.0,
                        "size3" : 60.0
                    }, 
                    {
                        "ItemId" : "MMNB219",
                        "size1" : 50.0,
                        "size2" : 630.0,
                        "size3" : 40.0
                    }, 
                    {
                        "ItemId" : "MMNB220",
                        "size1" : 50.0,
                        "size2" : 416.0,
                        "size3" : 28.846
                    }, 
                    {
                        "ItemId" : "MMNB270",
                        "size1" : 50.0,
                        "size2" : 262.0,
                        "size3" : 20.0
                    }, 
                    {
                        "ItemId" : "MMNB302",
                        "size1" : 15.0,
                        "size2" : 195.0,
                        "size3" : 6.0
                    }, 
                    {
                        "ItemId" : "MMNB373",
                        "size1" : 3.0,
                        "size2" : 45.0,
                        "size3" : 3.75
                    }
                ],
                "accountId" : 1.0
            }, 
            {
                "_id" : ObjectId("59c3b291f251c77f15790f9d"),
                "orderId" : "AQ137O1701240",
                "serviceLocationId" : "36728",
                "orderNo" : "AQ137O1701240",
                "orderDate" : "18-Sep-17",
                "description" : "AQ137O1701240",
                "serviceType" : "Delivery",
                "orderSource" : "Import",
                "takenBy" : "KARIM",
                "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
                "plannedDeliveryTime" : "",
                "actualDeliveryDate" : "",
                "actualDeliveryTime" : "",
                "deliveredBy" : "",
                "size1" : 28.0,
                "size2" : 520.11,
                "size3" : 52.5,
                "jobPriority" : 1.0,
                "cancelReason" : "",
                "cancelDate" : "",
                "cancelBy" : "",
                "reasonCode" : "",
                "reasonText" : "",
                "status" : "",
                "lineItems" : [ 
                    {
                        "ItemId" : "MMGW001",
                        "size1" : 25.0,
                        "size2" : 464.38,
                        "size3" : 46.875
                    }, 
                    {
                        "ItemId" : "MMGW001-F1",
                        "size1" : 3.0,
                        "size2" : 55.73,
                        "size3" : 5.625
                    }
                ],
                "accountId" : 1.0
            }, 
            {
                "_id" : ObjectId("59c3b291f251c77f15790fd8"),
                "orderId" : "AQ110O1705036",
                "serviceLocationId" : "36728",
                "orderNo" : "AQ110O1705036",
                "orderDate" : "18-Sep-17",
                "description" : "AQ110O1705036",
                "serviceType" : "Delivery",
                "orderSource" : "Import",
                "takenBy" : "KARIM",
                "plannedDeliveryDate" : ISODate("2017-08-26T00:00:00.000Z"),
                "plannedDeliveryTime" : "",
                "actualDeliveryDate" : "",
                "actualDeliveryTime" : "",
                "deliveredBy" : "",
                "size1" : 60.0,
                "size2" : 1046.0,
                "size3" : 68.0,
                "jobPriority" : 1.0,
                "cancelReason" : "",
                "cancelDate" : "",
                "cancelBy" : "",
                "reasonCode" : "",
                "reasonText" : "",
                "status" : "",
                "lineItems" : [ 
                    {
                        "ItemId" : "MMNB218",
                        "size1" : 50.0,
                        "size2" : 920.0,
                        "size3" : 60.0
                    }, 
                    {
                        "ItemId" : "MMNB219",
                        "size1" : 10.0,
                        "size2" : 126.0,
                        "size3" : 8.0
                    }
                ],
                "accountId" : 1.0
            }
        ],
        "serviceTime" : {
            "_id" : ObjectId("59c3b07cb7799c90ebb32cdc"),
            "serviceTimeTypeId" : "1",
            "serviceTimeType" : "nohelper",
            "description" : "",
            "fixedTime" : 30.0,
            "variableTime" : 0.0,
            "accountId" : 1.0
        }
    }
    
  • 2

    Mongorestore具有附加在数据库中已有的任何内容之上的功能,因此这种行为可用于组合两个集合:

    • mongodump collection1

    • collection2.rename(collection1)

    • mongorestore

    还没试过,但它可能比map / reduce方法执行得更快 .

  • 0

    代码段 . 礼貌 - 堆栈溢出的多个帖子,包括这个 .

    db.cust.drop();
     db.zip.drop();
     db.cust.insert({cust_id:1, zip_id: 101});
     db.cust.insert({cust_id:2, zip_id: 101});
     db.cust.insert({cust_id:3, zip_id: 101});
     db.cust.insert({cust_id:4, zip_id: 102});
     db.cust.insert({cust_id:5, zip_id: 102});
    
     db.zip.insert({zip_id:101, zip_cd:'AAA'});
     db.zip.insert({zip_id:102, zip_cd:'BBB'});
     db.zip.insert({zip_id:103, zip_cd:'CCC'});
    
    mapCust = function() {
        var values = {
            cust_id: this.cust_id
        };
        emit(this.zip_id, values);
    };
    
    mapZip = function() {
        var values = {
        zip_cd: this.zip_cd
        };
        emit(this.zip_id, values);
    };
    
    reduceCustZip =  function(k, values) {
        var result = {};
        values.forEach(function(value) {
        var field;
            if ("cust_id" in value) {
                if (!("cust_ids" in result)) {
                    result.cust_ids = [];
                }
                result.cust_ids.push(value);
            } else {
        for (field in value) {
            if (value.hasOwnProperty(field) ) {
                    result[field] = value[field];
            }
             };  
           }
          });
           return result;
    };
    
    
    db.cust_zip.drop();
    db.cust.mapReduce(mapCust, reduceCustZip, {"out": {"reduce": "cust_zip"}});
    db.zip.mapReduce(mapZip, reduceCustZip, {"out": {"reduce": "cust_zip"}});
    db.cust_zip.find();
    
    
    mapCZ = function() {
        var that = this;
        if ("cust_ids" in this.value) {
            this.value.cust_ids.forEach(function(value) {
                emit(value.cust_id, {
                    zip_id: that._id,
                    zip_cd: that.value.zip_cd
                });
            });
        }
    };
    
    reduceCZ = function(k, values) {
        var result = {};
        values.forEach(function(value) {
            var field;
            for (field in value) {
                if (value.hasOwnProperty(field)) {
                    result[field] = value[field];
                }
            }
        });
        return result;
    };
    db.cust_zip_joined.drop();
    db.cust_zip.mapReduce(mapCZ, reduceCZ, {"out": "cust_zip_joined"}); 
    db.cust_zip_joined.find().pretty();
    
    
    var flattenMRCollection=function(dbName,collectionName) {
        var collection=db.getSiblingDB(dbName)[collectionName];
    
        var i=0;
        var bulk=collection.initializeUnorderedBulkOp();
        collection.find({ value: { $exists: true } }).addOption(16).forEach(function(result) {
            print((++i));
            //collection.update({_id: result._id},result.value);
    
            bulk.find({_id: result._id}).replaceOne(result.value);
    
            if(i%1000==0)
            {
                print("Executing bulk...");
                bulk.execute();
                bulk=collection.initializeUnorderedBulkOp();
            }
        });
        bulk.execute();
    };
    
    
    flattenMRCollection("mydb","cust_zip_joined");
    db.cust_zip_joined.find().pretty();
    
  • 0

    是的你可以:采取我今天写的这个实用功能:

    function shangMergeCol() {
      tcol= db.getCollection(arguments[0]);
      for (var i=1; i<arguments.length; i++){
        scol= db.getCollection(arguments[i]);
        scol.find().forEach(
            function (d) {
                tcol.insert(d);
            }
        )
      }
    }
    

    您可以将任意数量的集合传递给此函数,第一个集合将成为目标集合 . 所有其余的集合都是要转移到目标集合的源 .

  • 5

    您've to do that in your application layer. If you'使用ORM,它可以使用注释(或类似的东西)来提取其他集合中存在的引用 . 我只有使用Morphia@Reference注释在查询时获取引用的实体,因此我可以避免在代码中自己执行此操作 .

相关问题