这可能是一个特例:
我想从队列中读取(AWS SQS),这是通过对消息进行等待几秒钟的调用来完成的,然后解析 - 并且只要您想要处理该队列,就可以在循环中反复调用(它)每次检查一个标志) .
这意味着我有一个 consume
函数,只要应用程序处于活动状态,或者队列未标记就会运行 .
我还有一个用于订阅队列的 subscribe
函数 - 一旦知道消费者能够连接到队列就应该解析 . 即使这个函数调用持续运行的消费者,并且在队列未标记之前也不会返回 .
它给了我一些挑战 - 你有没有关于如何用现代JS和async / await promises解决这个问题的技巧?我记住这个代码在React Web应用程序中运行,而不是在node.js中运行 .
我基本上只是希望 await subscribe(QUEUE)
调用(来自GUI)在它确定可以从该队列中读取时立即解析 . 但如果它不能,我希望它抛出一个错误,它传播到订阅调用的原点 - 这意味着我必须 await consume(QUEUE)
,对吧?
更新:添加了一些未经测试的草稿代码(我不想花更多的时间让它工作,如果我没有采取正确的方法) - 我考虑将成功和失败回调发送到消费函数,因此它可以报告一旦它从队列获得第一个有效(但可能是空的)响应就会成功,这使得它将队列URL存储为订阅 - 并且如果队列轮询失败则取消订阅 .
由于我正在设置几个队列消费者,因此他们不应该阻止任何事情,而只是在后台工作
let subscribedQueueURLs = []
async function consumeQueue(
url: QueueURL,
success: () => mixed,
failure: (error: Error) => mixed
) {
const sqs = new AWS.SQS()
const params = {
QueueUrl: url,
WaitTimeSeconds: 20,
}
try {
do {
// eslint-disable-next-line no-await-in-loop
const receivedData = await sqs.receiveMessage(params).promise()
if (!subscribedQueueURLs.includes(url)) {
success()
}
// eslint-disable-next-line no-restricted-syntax
for (const message of receivedData.Messages) {
console.log({ message })
// eslint-disable-next-line no-await-in-loop
eventHandler && (await eventHandler.message(message, url))
const deleteParams = {
QueueUrl: url,
ReceiptHandle: message.ReceiptHandle,
}
// eslint-disable-next-line no-await-in-loop
const deleteResult = await sqs.deleteMessage(deleteParams).promise()
console.log({ deleteResult })
}
} while (subscribedQueueURLs.includes(url))
} catch (error) {
failure(error)
}
}
export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
const url = generateQueueURL(entityType, entityId)
consumeQueue(
url,
() => {
subscribedQueueURLs.push(url)
eventHandler && eventHandler.subscribe(url)
},
error => {
console.error(error)
unsubscribe(entityType, entityId)
}
)
}
1 回答
我最终解决了这个问题 - 虽然可能不是最优雅的解决方案......