首页 文章

Mongoose找到一个并推送到一系列文档

提问于
浏览
3

我是MongoDB和Mongoose的新手,我正在尝试使用它来保存日间交易分析的股票价格 . 所以我想象这个架构:

symbolSchema = Schema({
    name:String,
    code:String
});

quoteSchema = Schema({
    date:{type:Date, default: now},
    open:Number, 
    high:Number,
    low:Number,
    close:Number,
    volume:Number
});

intradayQuotesSchema = Schema({
    id_symbol:{type:Schema.Types.ObjectId, ref:"symbol"},
    day:Date,
    quotes:[quotesSchema]
});

从我的链接我每分钟收到这样的信息:

date | symbol | open | high | low | close | volume

2015-03-09 13:23:00 | AAPL | 127,14 | 127,17 | 127,12 | 127,15 | 19734

我必须:

  • 找到符号的ObjectId(AAPL) .

  • 发现此符号的intradayQuote文档是否已存在(符号和日期组合)

  • 发现此符号的分钟OHLCV数据是否存在于引号数组中(因为它可以重复)

  • 更新或创建文档并在阵列内更新或创建引号

如果引号已经存在,我能够完成此任务而不会发生任何事情,但是此方法可以在引号数组中创建重复的条目:

symbol.find({"code":mySymbol}, function(err, stock) {
    intradayQuote.findOneAndUpdate({
        { id_symbol:stock[0]._id, day: myDay },
        { $push: { quotes: myQuotes } },
        { upsert: true },
        myCallback
    });
});

我已经尝试过:

  • $addToSet 而不是$ push,但不幸的是,这似乎不适用于文档数组

  • {id_symbol:stock [0] ._ id,day:myDay,'quotes[ 2739462 ]':myDate}的条件 findOneAndUpdate ;但不幸的是,如果mongo找不到它,它会为分钟创建一个新文档,而不是附加到引号数组 .

有没有办法让这个工作不使用一个更多的查询(我已经使用2)?我应该重新考虑我的架构以促进这项工作吗?任何帮助将不胜感激 . 谢谢!

1 回答

  • 2

    基本上把$addToSet运算符不适合你,因为你的数据不是真正的"set"定义是"completely distinct"对象的集合 .

    这里的另一个逻辑意义是,当数据到达时,您将处理数据,无论是作为sinlge对象还是feed . 我会假设它以某种形式提供了许多项目,并且您可以使用某种流处理器来获得每个收到的文档的结构:

    {
        "date": new Date("2015-03-09 13:23:00.000Z"),
        "symbol": "AAPL",
        "open": 127.14
        "high": 127.17,
        "low": 127.12 
        "close": 127.15,
        "volume": 19734
    }
    

    转换为标准十进制格式以及UTC日期,因为当从数据存储区检索数据时,任何区域设置确实应该是应用程序的域 .

    我还会通过删除对其他集合的引用并将数据放在那里来至少平掉你的“intraDayQuoteSchema” . 您仍然需要在插入时查找,但读取时附加填充的开销似乎比存储开销更昂贵:

    intradayQuotesSchema = Schema({
        symbol:{
            name: String,
            code: String
        },
        day:Date,
        quotes:[quotesSchema]
    });
    

    这取决于您的使用模式,但它可能更有效 .

    其余的真正归结为可以接受的东西

    stream.on(function(data) {
    
        var symbol = data.symbol,
            myDay = new Date( 
                data.date.valueOf() - 
                    ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
        delete data.symbol;
    
        symbol.findOne({ "code": symbol },function(err,stock) {
    
            intraDayQuote.findOneAndUpdate(
                { "symbol.code": symbol , "day": myDay },
                { "$setOnInsert": { 
                   "symbol.name": stock.name
                   "quotes": [data] 
                }},
                { "upsert": true }
                function(err,doc) {
                    intraDayQuote.findOneAndUpdate(
                        {
                            "symbol.code": symbol,
                            "day": myDay,
                            "quotes.date": data.date
                        },
                        { "$set": { "quotes.$": data } },
                        function(err,doc) {
                            intraDayQuote.findOneAndUpdate(
                                {
                                    "symbol.code": symbol,
                                    "day": myDay,
                                    "quotes.date": { "$ne": data.date }
                                },
                                { "$push": { "quotes": data } },
                                function(err,doc) {
    
                                }
                           );    
                        }
                    );
                }
            );    
        });
    });
    

    如果您在响应中实际上不需要修改后的文档,那么通过在此处实现批量操作API并在单个数据库请求中发送此包中的所有更新,您将获得一些好处:

    stream.on("data",function(data) {
    
        var symbol = data.symbol,
            myDay = new Date( 
                data.date.valueOf() - 
                    ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
        delete data.symbol;
    
         symbol.findOne({ "code": symbol },function(err,stock) {
             var bulk = intraDayQuote.collection.initializeOrderedBulkOp();
             bulk.find({ "symbol.code": symbol , "day": myDay })
                 .upsert().updateOne({
                     "$setOnInsert": { 
                         "symbol.name": stock.name
                         "quotes": [data] 
                     }
                 });
    
             bulk.find({
                 "symbol.code": symbol,
                 "day": myDay,
                 "quotes.date": data.date
             }).updateOne({
                 "$set": { "quotes.$": data }
             });
    
             bulk.find({
                 "symbol.code": symbol,
                 "day": myDay,
                 "quotes.date": { "$ne": data.date }
             }).updateOne({
                 "$push": { "quotes": data }
             });
    
             bulk.execute(function(err,result) {
                 // maybe do something with the response
             });            
         });
    });
    

    关键是,那里只有一个语句实际上会修改数据,并且因为这些都是在同一个请求中发送的,所以应用程序和服务器之间来回减少了 .

    另一种情况是,在这种情况下,在另一个集合中引用实际数据可能更简单 . 这只是处理upserts的一个简单问题:

    intradayQuotesSchema = Schema({
        symbol:{
            name: String,
            code: String
        },
        day:Date,
        quotes:[{ type: Schema.Types.ObjectId, ref: "quote" }]
    });
    
    
    // and in the steam processor
    
    stream.on("data",function(data) {
    
        var symbol = data.symbol,
            myDay = new Date( 
                data.date.valueOf() - 
                    ( data.date.valueOf() % 1000 * 60 * 60 * 24 ));
        delete data.symbol;
    
        symbol.findOne({ "code": symbol },function(err,stock) {
             quote.update(
                { "date": data.date },
                { "$setOnInsert": data },
                { "upsert": true },
                function(err,num,raw) {
                    if ( !raw.updatedExisting ) {
                        intraDayQuote.update(
                            { "symbol.code": symbol , "day": myDay },
                            { 
                                "$setOnInsert": {
                                    "symbol.name": stock.name
                                },
                                "$addToSet": { "quotes": data }
                            },
                            { "upsert": true },
                            function(err,num,raw) {
    
                            }
                        );
                    }
                }
            );
        });
    });
    

    这真的归结为对于你来说,拥有嵌套在"day"文档中的引号数据的重要性 . 主要区别在于,如果要根据数据查询这些文档中的某些字段,或者使用 .populate() 从其他集合中提取"quotes"的开销 .

    当然,如果引用和引用数据对您的查询过滤很重要,那么您始终可以只查询该集合以查找匹配的 _id 值,并使用"day"文档上的$in查询仅匹配包含匹配"quote"文档的日期 .

    根据应用程序使用数据的方式,最重要的是哪个路径最重要 . 希望这应该指导你做你想要实现的目标背后的一般概念 .

    P.S除非你“确定”你的源数据总是一个四舍五入到精确“分钟”的日期,否则你可能想要使用与用于获得离散“日”的相同类型的日期舍入数学 .

相关问题