我一直在和我的程序员争论最好的解决方法 . 我们的数据以每秒约10000个对象的速度进入 . 这需要异步处理,但是松散排序就足够了,因此每个对象都循环插入到几个消息队列之一(还有几个 生产环境 者和消费者) . 每个对象约300个字节 . 它需要持久,因此MQ配置为持久存储到磁盘 .
问题是这些对象经常是重复的(因为它们不可避免地复制到 生产环境 者的数据中) . 它们具有10字节的唯一ID . 如果对象在队列中重复,则不是灾难性的,但是如果它们在从队列中取出后在处理中被复制,那就不是灾难性的 . 什么是确保尽可能接近线性可扩展性同时确保对象处理不重复的最佳方法?也许与此相关,是应该将整个对象存储在消息队列中,还是只将id与body存储在像cassandra这样的内容中?
谢谢!
Edit: 确认重复发生的位置 . 此外,到目前为止我还在考虑使用RabbitMQ . 关于我的要求,每种方法的优缺点是什么?
3 回答
在不知道如何在系统内创建消息的情况下, 生产环境 者用于发布到队列的机制,以及知道队列系统正在使用中,很难诊断正在发生的事情 .
我已经看到这种情况以多种不同的方式发生;超时工作导致消息在队列中再次可见(因此第二次处理,这在Kestrel中很常见),错误配置的代理(HA ActiveMQ浮现在脑海中),错误配置的客户端(Spring加上Camel路由会浮现在脑海中) ,客户双重提交等 . 这种问题可以通过多种方式出现 .
既然我可以在这里插上redis . 您可以轻松地将SPOP(其为O(1),与SADD一样)与pub/sub结合使用,以获得非常快速,恒定的时间,重复的空闲(集合必须包含唯一元素)队列 . 虽然这是一个红宝石项目,但_2523010可能会有所帮助 . 它至少值得一看 .
祝你好运 .
p.s: this is the first time in my life that redis website is having problems, but I bet when you visit it, they have solved the problem
我的第一个建议是查看redis,因为它非常快,我打赌你只需要一个消息队列即可处理所有消息 .
首先,我想向您展示有关我的笔记本电脑的信息(我喜欢它,但是一台大型服务器会更快;)) . 我爸爸(有点印象深刻:))最近买了一台新电脑,它硬打我的笔记本电脑(8 cpu而不是2) .
在我的机器上使用
redis-benchmark
的基准测试下方甚至没有进行太多的redis优化:正如你可以从我的简单笔记本电脑基准测试中看到的那样,你可能只需要一个消息队列,因为redis可以在0.23秒内处理10000 lpush个请求,在0.21秒内处理10000个lpop个请求 . 当你只需要一个队列时,我相信你的问题不再是一个问题(或者是 生产环境 者的产品重复,我完全不理解?) .
redis也坚持光盘 .
使用单个消息队列(框)时,如果我理解正确,则此问题不存在 . 但如果不是你可以只是检查id is member of your set ids . 当你处理id时,你应该remove it from the set ids . 首先,您应该使用sadd将成员添加到列表中 .
如果一个盒子不再缩放,你应该将你的键分成多个盒子并检查那个盒子上的那个键 . 要了解更多相关信息,我认为您应该阅读以下链接:
Redis replication and redis sharding (cluster) difference
http://antirez.com/post/redis-presharding.html
http://redis.io/presentation/Redis_Cluster.pdf
http://blog.zawodny.com/2011/02/26/redis-sharding-at-craigslist/
如果可能的话,你应该将所有信息直接存储到内存中,因为没有什么能像内存一样快速运行(好吧你的cache内存更快但实际上非常小,而且你无法通过你的代码访问它) . Redis会将您的所有信息存储在内存中,并将快照存储到光盘中 . 我认为你应该能够将所有信息存储在内存中,并完全跳过像Cassandra这样的东西 .
让我们考虑每个对象总共400个字节,速率为10000每秒=> 4000000每秒所有对象的字节数= = 4 MB / s,如果我的计算是正确的 . 您可以轻松地将大量信息存储在内存中 . 如果你不能,你应该真的考虑升级你的记忆,如果可能的话,因为内存不再昂贵了 .
如果您不介意将Camel投入混音,那么您可以使用idempotent-consumer EIP来帮助解决这个问题 .
此外,ActiveMQ Message Groups可用于对相关消息进行分组,使它们更容易执行重复检查,并仍然保持高吞吐量等...