首页 文章

ES7承诺并等待异步功能,它在后台永远循环

提问于
浏览
-1

这可能是一个特例:

我想从队列中读取(AWS SQS),这是通过对消息进行等待几秒钟的调用来完成的,然后解析 - 并且只要您想要处理该队列,就可以在循环中反复调用(它)每次检查一个标志) .

这意味着我有一个 consume 函数,只要应用程序处于活动状态,或者队列未标记就会运行 .

我还有一个用于订阅队列的 subscribe 函数 - 一旦知道消费者能够连接到队列就应该解析 . 即使这个函数调用持续运行的消费者,并且在队列未标记之前也不会返回 .

它给了我一些挑战 - 你有没有关于如何用现代JS和async / await promises解决这个问题的技巧?我记住这个代码在React Web应用程序中运行,而不是在node.js中运行 .

我基本上只是希望 await subscribe(QUEUE) 调用(来自GUI)在它确定可以从该队列中读取时立即解析 . 但如果它不能,我希望它抛出一个错误,它传播到订阅调用的原点 - 这意味着我必须 await consume(QUEUE) ,对吧?

更新:添加了一些未经测试的草稿代码(我不想花更多的时间让它工作,如果我没有采取正确的方法) - 我考虑将成功和失败回调发送到消费函数,因此它可以报告一旦它从队列获得第一个有效(但可能是空的)响应就会成功,这使得它将队列URL存储为订阅 - 并且如果队列轮询失败则取消订阅 .

由于我正在设置几个队列消费者,因此他们不应该阻止任何事情,而只是在后台工作

let subscribedQueueURLs = []

async function consumeQueue(
  url: QueueURL,
  success: () => mixed,
  failure: (error: Error) => mixed
) {
  const sqs = new AWS.SQS()
  const params = {
    QueueUrl: url,
    WaitTimeSeconds: 20,
  }

  try {
    do {
      // eslint-disable-next-line no-await-in-loop
      const receivedData = await sqs.receiveMessage(params).promise()
      if (!subscribedQueueURLs.includes(url)) {
        success()
      }
      // eslint-disable-next-line no-restricted-syntax
      for (const message of receivedData.Messages) {
        console.log({ message })
        // eslint-disable-next-line no-await-in-loop
        eventHandler && (await eventHandler.message(message, url))

        const deleteParams = {
          QueueUrl: url,
          ReceiptHandle: message.ReceiptHandle,
        }
        // eslint-disable-next-line no-await-in-loop
        const deleteResult = await sqs.deleteMessage(deleteParams).promise()
        console.log({ deleteResult })
      }
    } while (subscribedQueueURLs.includes(url))
  } catch (error) {
    failure(error)
  }
}

export const subscribe = async (entityType: EntityType, entityId: EntityId) => {
  const url = generateQueueURL(entityType, entityId)
  consumeQueue(
    url,
    () => {
      subscribedQueueURLs.push(url)
      eventHandler && eventHandler.subscribe(url)
    },
    error => {
      console.error(error)
      unsubscribe(entityType, entityId)
    }
  )
}

1 回答

  • 0

    我最终解决了这个问题 - 虽然可能不是最优雅的解决方案......

    let eventHandler: ?EventHandler
    let awsOptions: ?AWSOptions
    let subscribedQueueUrls = []
    let sqs = null
    let sns = null
    
    
    export function setup(handler: EventHandler) {
      eventHandler = handler
    }
    
    export async function login(
      { awsKey, awsSecret, awsRegion }: AWSCredentials,
      autoReconnect: boolean
    ) {
      const credentials = new AWS.Credentials(awsKey, awsSecret)
      AWS.config.update({ region: awsRegion, credentials })
      sqs = new AWS.SQS({ apiVersion: '2012-11-05' })
      sns = new AWS.SNS({ apiVersion: '2010-03-31' })
      const sts = new AWS.STS({ apiVersion: '2011-06-15' })
      const { Account } = await sts.getCallerIdentity().promise()
      awsOptions = { accountId: Account, region: awsRegion }
      eventHandler && eventHandler.login({ awsRegion, awsKey, awsSecret }, autoReconnect)
    }
    
    
    async function handleQueueMessages(messages, queueUrl) {
      if (!sqs) {
        throw new Error(
          'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
        )
      }
      // eslint-disable-next-line no-restricted-syntax
      for (const message of messages) {
        if (!eventHandler) {
          return
        }
        // eslint-disable-next-line no-await-in-loop
        await eventHandler.message({
          content: message,
          queueUrl,
          timestamp: new Date().toISOString(),
        })
    
        const deleteParams = {
          QueueUrl: queueUrl,
          ReceiptHandle: message.ReceiptHandle,
        }
        // eslint-disable-next-line no-await-in-loop
        await sqs.deleteMessage(deleteParams).promise()
      }
    }
    
    
    export async function subscribe(queueUrl: QueueUrl) {
      if (!sqs) {
        throw new Error(
          'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
        )
      }
    
      const initialParams = {
        QueueUrl: queueUrl,
        WaitTimeSeconds: 0,
        MessageAttributeNames: ['All'],
        AttributeNames: ['All'],
      }
    
      const longPollParams = {
        ...initialParams,
        WaitTimeSeconds: 20,
      }
    
      // Attempt to consume the queue, and handle any pending messages.
      const firstResponse = await sqs.receiveMessage(initialParams).promise()
      if (!subscribedQueueUrls.includes(queueUrl)) {
        subscribedQueueUrls.push(queueUrl)
        eventHandler && eventHandler.subscribe(queueUrl)
      }
      handleQueueMessages(firstResponse.Messages, queueUrl)
    
      // Keep on polling the queue afterwards.
      setImmediate(async () => {
        if (!sqs) {
          throw new Error(
            'Attempt to subscribe before SQS client is ready (i.e. authenticated).'
          )
        }
        try {
          do {
            // eslint-disable-next-line no-await-in-loop
            const received = await sqs.receiveMessage(longPollParams).promise()
    
            handleQueueMessages(received.Messages, queueUrl)
          } while (sqs && subscribedQueueUrls.includes(queueUrl))
        } catch (error) {
          eventHandler && eventHandler.disconnect()
          throw error
        }
      })
    }
    

相关问题