首页 文章

在SQS队列中使用许多使用者

提问于
浏览
5

我知道可以使用多个线程来使用SQS队列 . 我想保证每条消息都会消耗一次 . 我知道可以更改消息的 visibility timeout ,例如,等于我的处理时间 . 如果我的进程花费的时间多于 visibility timeout (例如,慢速连接),则其他线程可以使用相同的消息 .

保证邮件一次处理的最佳方法是什么?

4 回答

  • 21

    您可以对消息和批处理使用setVisibilityTimeout(),以便延长可见时间,直到线程完成处理消息 .

    这可以通过使用scheduledExecutorService来完成,并在初始可见时间的一半之后安排可运行事件 . 下面的代码片段在visibilityTime的每一半创建并执行VisibilityTimeExtender,其可见时间为一半 . (时间应保证消息处理,使用visibilityTime / 2进行扩展)

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
    ScheduledFuture<?> futureEvent = scheduler.scheduleAtFixedRate(new VisibilityTimeExtender(..), visibilityTime/2, visibilityTime/2, TimeUnit.SECONDS);
    

    VisibilityTimeExtender必须实现Runnable,并且您可以在其中更新新的可见性时间 .

    线程完成处理消息后,您可以从队列中删除它,并调用futureEvent.cancel(true)来停止预定的事件 .

  • 1

    在收到消息时,将消息或对消息的引用存储在消息ID上具有唯一约束的数据库中 . 如果表中存在ID,您已经收到它,并且由于唯一约束,数据库将不允许您再次插入它 .

  • 1

    当您使用API等读取消息时,AWS SQS API不会自动“使用”消息 . 开发人员需要拨打电话来自行删除邮件 .

    SQS确实有一个功能称为“重启策略”作为“死信队列设置”的一部分 . 您只需将读取请求设置为1.如果使用过程崩溃,则后续读取相同的消息会将消息放入死信队列 .

    SQS队列可见性超时最长可设置为12小时 . 除非您有特殊需要,否则您需要实现将消息处理程序存储在数据库中以允许其进行检查的过程 .

  • 0

    保证邮件处理一次的最佳方法是什么?

    你要求保证 - you won't get one . 您可以将消息处理的可能性降低多次到 very small amount ,但是您无法获得保证 .

    我将解释为什么,以及减少重复的策略 .

    重复来自何处

    • 当您在SQS中放入消息时,SQS可能实际上不止一次收到该消息

    • 例如:发送消息时发生轻微的网络打嗝导致自动重试的瞬态错误 - 从消息发送者的角度来看,它失败一次,并成功发送一次,但SQS收到了这两个消息 .

    • SQS can internally generate duplicates

    • Simlar到第一个例子 - 有很多计算机处理消息,SQS需要确保没有丢失 - 消息存储在多个服务器上,这可能导致重复 .

    在大多数情况下,通过利用SQS message visibility timeout,从这些来源重复的可能性已经相当小 - 比如小百分之几 .

    如果处理副本确实不是那么糟糕(strive to make your message consumption idempotent!),我会认为这很好 - 减少重复的可能性进一步复杂且可能很昂贵......


    您的应用程序可以做些什么来进一步减少重复?

    好的,这里我们沿着兔子洞走下去...在高级别,您将需要为您的消息分配唯一的ID,并在开始处理之前检查正在进行或已完成的ID的原子缓存:

    • 确保您的邮件在插入时提供了唯一标识符

    • 如果没有这个,你将无法区分重复 .

    • 处理'end of the line'处的重复邮件 .

    • 如果您的消息接收者需要发送消息以供进一步处理,那么它可能是另一个重复源(出于上述类似的原因)

    • 你需要某个地方以原子方式存储和检查这些唯一ID(并在超时后刷新它们) . 有两个重要的状态:"InProgress"和"Completed"

    • InProgress条目应根据处理失败时需要恢复的速度超时 .

    • 已完成的条目应根据您希望重复数据删除窗口的持续时间而超时

    • 最简单的可能是Guava cache,但只对单个处理应用程序有用 . 如果您有大量消息或分布式消耗,请考虑此作业的数据库(使用后台进程扫描过期条目)

    • 在处理消息之前,尝试将messageId存储在"InProgress"中 . 如果已经存在,请停止 - 您只需处理一份副本 .

    • 检查消息是否为"Completed"(如果在那里则停止)

    • Your thread now has an exclusive lock on that messageId - Process your message

    • 将messageId标记为"Completed" - As long as this messageId stays here, you won't process any duplicates for that messageId .

    • 你可能无法承受无限的存储空间 .

    • 从"InProgress"删除messageId(或者让它从此处过期)

    一些笔记

    • 请记住,没有所有这些重复的可能性已经非常低了 . 根据消息的重复数据删除时间和金钱的 Value ,您可以随意跳过或修改任何步骤

    • 例如,您可以省略"InProgress",但这会让两个线程同时处理重复消息的可能性很小(第二个线程在第一个消息开始之前有"Completed"它)

    • 您的重复数据删除窗口只要您在"Completed"中保留messageIds即可 . 由于您可能无法承受无限存储空间,因此最后至少要长达2倍的SQS消息可见性超时;之后重复的可能性降低(除了已经非常低的机会,但仍然无法保证) .

    • 即使有了这一切, there is still a chance of duplication - 所有预防措施和SQS消息可见性超时有助于将这个机会减少到非常小,但机会仍然存在:

    • 你的应用程序可以在处理完邮件之后立即崩溃/挂起/执行一个很长的GC,但是在messageId是_91007之前(可能你正在为这个存储使用数据库并且与它的连接已关闭)

    • 在这种情况下,"Processing"将最终到期,另一个线程可以处理此消息(在SQS可见性超时也到期之后或者因为SQS中有重复) .

相关问题