首页 文章

防止NodeJS中的并发处理

提问于
浏览
1

我需要NodeJS来防止相同请求的并发操作 . 根据我的理解,如果NodeJS收到多个请求,则会发生以下情况:

REQUEST1 ---> DATABASE_READ
REQUEST2 ---> DATABASE_READ
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST1_END
DATABASE_READ complete ---> EXPENSIVE_OP() --> REQUEST2_END

这导致两个昂贵的操作运行 . 我需要的是这样的事情:

REQUEST1 ---> DATABASE_READ
DATABASE_READ complete ---> DATABASE_UPDATE
DATABASE_UPDATE complete ---> REQUEST2 ---> DATABASE_READ ––> REQUEST2_END
                         ---> EXPENSIVE_OP() --> REQUEST1_END

这就是它在代码中的样子 . 问题是应用程序开始读取缓存值和完成写入缓存值之间的窗口 . 在此窗口期间,并发请求不知道已经存在一个运行相同itemID的请求 .

app.post("/api", async function(req, res) {
    const itemID = req.body.itemID

    // See if itemID is processing
    const processing = await DATABASE_READ(itemID)
    // Due to how NodeJS works, 
    // from this point in time all requests
    // to /api?itemID="xxx" will have processing = false 
    // and will conduct expensive operations

    if (processing == true) {
        // "Cheap" part
        // Tell client to wait until itemID is processed
    } else {
        // "Expensive" part
        DATABASE_UPDATE({[itemID]: true})
        // All requests to /api at this point
        // are still going here and conducting 
        // duplicate operations.
        // Only after DATABASE_UPDATE finishes, 
        // all requests go to the "Cheap" part
        DO_EXPENSIVE_THINGS();
    }
}

编辑

我当然可以这样做:

const lockedIDs = {}
app.post("/api", function(req, res) {
    const itemID = req.body.itemID
    const locked = lockedIDs[itemID] ? true : false // sync equivalent to async DATABASE_READ(itemID)
    if (locked) {
        // Tell client to wait until itemID is processed
        // No need to do expensive operations
    } else {
        lockedIDs[itemID] = true // sync equivalent to async DATABASE_UPDATE({[itemID]: true})
        // Do expensive operations
        // itemID is now "locked", so subsequent request will not go here
    }
}

lockedIDs 此处的行为类似于内存中的 synchronous 键值数据库 . 没关系,如果它只是一台服务器 . 但是,如果有多个服务器实例呢?我需要一个单独的缓存存储,比如Redis . 我只能访问Redis asynchronously . 不幸的是,这不会起作用 .

2 回答

  • 1

    您可以创建一个本地 Map 对象(在内存中用于同步访问),该对象包含任何itemID作为正在处理的键 . 您可以将该密钥的值设置为一个承诺,该承诺可以解析之前处理该密钥的任何人的结果 . 我认为这就像守门员 . 它跟踪正在处理的itemID .

    此方案告诉将来请求相同的itemID等待并且不阻止其他请求 - 我认为这很重要,而不是仅仅对与itemID处理相关的所有请求使用全局锁定 .

    然后,作为处理的一部分,首先检查本地Map对象 . 如果该密钥在那里,那么它当前正在处理中 . 然后,您可以等待来自Map对象的promise,以查看它何时完成处理并获得先前处理的任何结果 .

    如果它不在Map对象中,那么它现在不会被处理,您可以立即将它放在Map中以将其标记为“正在进行中” . 如果将promise设置为值,则可以使用此对象处理获得的任何结果来解析该promise .

    任何其他请求将最终只是等待该承诺,因此您将只处理此ID一次 . 第一个以该ID开头的将处理它,并且在处理时出现的所有其他请求将使用相同的共享结果(从而节省重复计算的重复) .

    我尝试编写一个示例代码,但并不真正理解你的伪代码试图做得足够好以提供代码示例 .

    像这样的系统必须有完美的错误处理,以便所有可能的错误路径正确处理 Map 中嵌入的 Map 和promise .

    基于您相当轻松的伪代码示例,这里有一个类似的伪代码示例,说明了上述概念:

    const itemInProcessCache = new Map();
    
    app.get("/api", async function(req, res) {
        const itemID = req.query.itemID
        let gate = itemInProcessCache.get(itemID);
        if (gate) {
            gate.then(val => {
                // use cached result here from previous processing
            }).catch(err => {
                // decide what to do when previous processing had an error
            });
        } else {
            let p = DATABASE_UPDATE({itemID: true}).then(result => {
                // expensive processing done
                // return final value so any others waiting on the gate can just use that value
                // decide if you want to clear this item from itemInProcessCache or not
            }).catch(err => {
                // error on expensive processing
    
                // remove from the gate cache because we didn't get a result
                // expensive processing will have to be done by someone else
                itemInProcessCache.delete(itemID);
            });
            // mark this item as being processed
            itemInProcessCache.set(itemID, p);
        }
    });
    

    注意:这依赖于node.js的单线程 . 在此处的请求处理程序返回之前,没有其他请求可以启动,以便在此itemID的任何其他请求开始之前调用 itemInProcessCache.set(itemID, p); .


    此外,我不太了解数据库,但这看起来非常像一个好的多用户数据库可能已内置或具有支持功能的功能,这使得这更容易,因为不想让多个不是一个不常见的想法请求所有尝试执行相同的数据库工作(或者更糟糕的是,打扰彼此的工作) .

  • 2

    好吧,让我解决一下 .

    所以,我对这个问题的问题在于你已经将这个问题抽象得太多了,以至于很难帮助你进行优化 . 目前尚不清楚“长时间运行的进程”正在做什么,它正在做什么将影响如何解决处理多个并发请求的挑战 . 您担心消耗资源的API是什么?

    从您的代码开始,我首先猜到你正在开始某种长期工作(例如文件转换或其他东西),但随后的一些编辑和评论让我觉得它可能只是一个复杂的查询数据库需要大量计算才能正确,因此您希望缓存查询结果 . 但是我也可以看到它是其他的东西,比如对你正在聚合的一堆第三方API的查询 . 每个场景都有一些细微差别,可以改变最佳状态 .

    也就是说,我将解释'缓存'方案,你可以告诉我你是否对其他解决方案更感兴趣 .

    基本上,你已经're in the right ballpark for the cache already. If you haven'已经,我建议你查看cache-manager,它会为这些场景稍微简化你的样板(让's you set cache invalidation and even have multi-tier caching). The piece that you'缺失的是你基本上应该总是回应你在缓存中的任何内容,并在外面填充缓存任何给定请求的范围 . 使用您的代码作为起点,类似这样的事情(省略所有try..catches和错误检查,以简化):

    // A GET is OK here, because no matter what we're firing back a response quickly, 
    //      and semantically this is a query
    app.get("/api", async function(req, res) {
        const itemID = req.query.itemID
    
        // In this case, I'm assuming you have a cache object that basically gets whatever
        //    is cached in your cache storage and can set new things there too.  
        let item = await cache.get(itemID)
    
        // Item isn't in the cache at all, so this is the very first attempt.  
        if (!item) {
            // go ahead and let the client know we'll get to it later. 202 Accepted should 
            //   be fine, but pick your own status code to let them know it's in process. 
            //   Other good options include [503 Service Unavailable with a retry-after 
            //   header][2] and [420 Enhance Your Calm][2] (non-standard, but funny)
            res.status(202).send({ id: itemID });
    
            // put an empty object in there so we know it's working on it. 
            await cache.set(itemID, {}); 
    
            // start the long-running process, which should update the cache when it's done
            await populateCache(itemID); 
            return;
        }
        // Here we have an item in the cache, but it's not done processing.  Maybe you 
        //     could just check to see if it's an empty object or not, but I'm assuming 
        //     that we've setup a boolean flag on the cached object for when it's done.
        if (!item.processed) {
            // The client should try again later like above.  Exit early. You could 
            //    alternatively send the partial item, an empty object, or a message. 
           return res.status(202).send({ id: itemID });
        } 
    
        // if we get here, the item is in the cache and done processing. 
        return res.send(item);
    }
    

    现在,我不是't know precisely what all your stuff does, but if it' s我, populateCache 来自上面是一个非常简单的功能只需调用我们用于执行长时间运行的任何服务,然后将其放入缓存中 .

    async function populateCache(itemId) {
       const item = await service.createThisWorkOfArt(itemId);
       await cache.set(itemId, item); 
       return; 
    }
    

    如果不清楚或者您的情景与我猜测的情况有什么不同,请告诉我 .

    正如评论中所提到的,这种方法将涵盖您在描述的场景中可能遇到的大多数正常问题,但如果它们比写入缓存存储的速度快,它仍然会允许两个请求同时触发长时间运行的进程 . (例如Redis) . 我判断发生这种情况的可能性非常低,但是如果你在缓存中没有任何内容,那么就像我上面那样做出反应,但是完全删除了实际上完全调用 populateCache 的块 .

    相反,您将运行一个单独的工作进程,该进程将定期(通常取决于您的业务案例)检查缓存中是否有未处理的作业,并启动处理它们的工作 . 通过这种方式,即使您对同一项目有1000个并发请求,您也可以确保只处理一次 . 当然,缺点是您可以将检查的周期性添加到获取完全处理数据的延迟中 .

相关问题