我正在寻找使用队列代理来处理我的用例,但我找不到适合我需求的代理 . 也许我的需求并不常见,或者我正在解决我的问题 .

情况:数百万人的人口统计数据信息进入系统 . 这些消息中的每一个都具有该人唯一的标识符 . 不幸的是,发送系统差异很大,有些人的奇怪做法导致同一个人在很短的时间内进入系统的数十个,甚至偶尔有数百个 . 我们当前的排队系统有多个处理器,不知道这些人及其标识符,它基本上是一个基于时间的桶拉和处理 . 因此,我们最终可能会同时处理同一个人,从而导致争用和竞争条件 . 最终,由于数据库事务回滚,它不是数据问题,但显然是很多不需要的处理(因为失败的事务导致消息被重新排队) . 但是,由于下游操作和我们缺乏发送系统知识,我们不能真正忽略或删除某些消息 - 我们必须最终处理它们 .

我理想的解决方案是基于每个人拥有的唯一标识符的队列队列 . 因此,它的工作方式是会有一个主队列不断得到处理 . 每次有一个独特的人进入系统时,就会创建该标识符的新队列 . 如果该人再次进入,则该消息将添加到该唯一标识符队列中 . 因此,当主队列最终使该元素出列时,它按顺序通过该唯一标识符队列 . 由于每个唯一标识符队列的工作将位于单独的线程中,因此主队列可以继续调度元素(达到唯一标识符队列工作线程数量的某些任意限制) . 这允许处理许多消息而不需要由不同的进程处理相同的人 .

我已经广泛地研究了RabbitMQ并且没有看到任何可以实现接近此行为的行为 . 即使我要为每个唯一标识符创建一个新队列,我也看不到可以按顺序处理它们的方法 . 而创造数百万队列的开销将是非常极端的 .

我只是简单地查看了其他排队经纪人,但谷歌搜索队列队列的概念并没有返回太多有用的信息,所以我开始认为这是一个人们不需要解决的问题 . 所以,我的问题是,是否有一个解决方案可以像我描述的那样以类似的方式促进排队队列?如果没有,是否有更好或不同的处理方式?