首页 文章

处理消息队列中的重复

提问于
浏览
11

我一直在和我的程序员争论最好的解决方法 . 我们的数据以每秒约10000个对象的速度进入 . 这需要异步处理,但是松散排序就足够了,因此每个对象都循环插入到几个消息队列之一(还有几个 生产环境 者和消费者) . 每个对象约300个字节 . 它需要持久,因此MQ配置为持久存储到磁盘 .

问题是这些对象经常是重复的(因为它们不可避免地复制到 生产环境 者的数据中) . 它们具有10字节的唯一ID . 如果对象在队列中重复,则不是灾难性的,但是如果它们在从队列中取出后在处理中被复制,那就不是灾难性的 . 什么是确保尽可能接近线性可扩展性同时确保对象处理不重复的最佳方法?也许与此相关,是应该将整个对象存储在消息队列中,还是只将id与body存储在像cassandra这样的内容中?

谢谢!

Edit: 确认重复发生的位置 . 此外,到目前为止我还在考虑使用RabbitMQ . 关于我的要求,每种方法的优缺点是什么?

3 回答

  • 3

    在不知道如何在系统内创建消息的情况下, 生产环境 者用于发布到队列的机制,以及知道队列系统正在使用中,很难诊断正在发生的事情 .

    我已经看到这种情况以多种不同的方式发生;超时工作导致消息在队列中再次可见(因此第二次处理,这在Kestrel中很常见),错误配置的代理(HA ActiveMQ浮现在脑海中),错误配置的客户端(Spring加上Camel路由会浮现在脑海中) ,客户双重提交等 . 这种问题可以通过多种方式出现 .

    既然我可以在这里插上redis . 您可以轻松地将SPOP(其为O(1),与SADD一样)与pub/sub结合使用,以获得非常快速,恒定的时间,重复的空闲(集合必须包含唯一元素)队列 . 虽然这是一个红宝石项目,但_2523010可能会有所帮助 . 它至少值得一看 .

    祝你好运 .

  • 2

    p.s: this is the first time in my life that redis website is having problems, but I bet when you visit it, they have solved the problem

    > We have data that comes in at a rate
    > of about 10000 objects per second.
    > This needs to be processed
    > asynchronously, but loose ordering is
    > sufficient, so each object is inserted
    > round-robin-ly into one of several
    > message queues (there are also several
    > producers and consumers)
    

    我的第一个建议是查看redis,因为它非常快,我打赌你只需要一个消息队列即可处理所有消息 .

    首先,我想向您展示有关我的笔记本电脑的信息(我喜欢它,但是一台大型服务器会更快;)) . 我爸爸(有点印象深刻:))最近买了一台新电脑,它硬打我的笔记本电脑(8 cpu而不是2) .

    -Computer-
    Processor       : 2x Intel(R) Core(TM)2 Duo CPU     T7100  @ 1.80GHz
    Memory      : 2051MB (1152MB used)
    Operating System        : Ubuntu 10.10
    User Name       : alfred (alfred)
    -Display-
    Resolution      : 1920x1080 pixels
    OpenGL Renderer     : Unknown
    X11 Vendor      : The X.Org Foundation
    -Multimedia-
    Audio Adapter       : HDA-Intel - HDA Intel
    -Input Devices-
     Power Button
     Lid Switch
     Sleep Button
     Power Button
     AT Translated Set 2 keyboard
     Microsoft Comfort Curve Keyboard 2000
     Microsoft Comfort Curve Keyboard 2000
     Logitech Trackball
     Video Bus
     PS/2 Logitech Wheel Mouse
    -SCSI Disks-
    HL-DT-ST DVDRAM GSA-T20N
    ATA WDC WD1600BEVS-2
    

    在我的机器上使用 redis-benchmark 的基准测试下方甚至没有进行太多的redis优化:

    alfred@alfred-laptop:~/database/redis-2.2.0-rc4/src$ ./redis-benchmark 
    ====== PING (inline) ======
      10000 requests completed in 0.22 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    94.84% <= 1 milliseconds
    98.74% <= 2 milliseconds
    99.65% <= 3 milliseconds
    100.00% <= 4 milliseconds
    46296.30 requests per second
    
    ====== PING ======
      10000 requests completed in 0.22 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    91.30% <= 1 milliseconds
    98.21% <= 2 milliseconds
    99.29% <= 3 milliseconds
    99.52% <= 4 milliseconds
    100.00% <= 4 milliseconds
    45662.10 requests per second
    
    ====== MSET (10 keys) ======
      10000 requests completed in 0.32 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    3.45% <= 1 milliseconds
    88.55% <= 2 milliseconds
    97.86% <= 3 milliseconds
    98.92% <= 4 milliseconds
    99.80% <= 5 milliseconds
    99.94% <= 6 milliseconds
    99.95% <= 9 milliseconds
    99.96% <= 10 milliseconds
    100.00% <= 10 milliseconds
    30864.20 requests per second
    
    ====== SET ======
      10000 requests completed in 0.21 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    92.45% <= 1 milliseconds
    98.78% <= 2 milliseconds
    99.00% <= 3 milliseconds
    99.01% <= 4 milliseconds
    99.53% <= 5 milliseconds
    100.00% <= 5 milliseconds
    47169.81 requests per second
    
    ====== GET ======
      10000 requests completed in 0.21 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    94.50% <= 1 milliseconds
    98.21% <= 2 milliseconds
    99.50% <= 3 milliseconds
    100.00% <= 3 milliseconds
    47619.05 requests per second
    
    ====== INCR ======
      10000 requests completed in 0.23 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    91.90% <= 1 milliseconds
    97.45% <= 2 milliseconds
    98.59% <= 3 milliseconds
    99.51% <= 10 milliseconds
    99.78% <= 11 milliseconds
    100.00% <= 11 milliseconds
    44444.45 requests per second
    
    ====== LPUSH ======
      10000 requests completed in 0.21 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    95.02% <= 1 milliseconds
    98.51% <= 2 milliseconds
    99.23% <= 3 milliseconds
    99.51% <= 5 milliseconds
    99.52% <= 6 milliseconds
    100.00% <= 6 milliseconds
    47619.05 requests per second
    
    ====== LPOP ======
      10000 requests completed in 0.21 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    95.89% <= 1 milliseconds
    98.69% <= 2 milliseconds
    98.96% <= 3 milliseconds
    99.51% <= 5 milliseconds
    99.98% <= 6 milliseconds
    100.00% <= 6 milliseconds
    47619.05 requests per second
    
    ====== SADD ======
      10000 requests completed in 0.22 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    91.08% <= 1 milliseconds
    97.79% <= 2 milliseconds
    98.61% <= 3 milliseconds
    99.25% <= 4 milliseconds
    99.51% <= 5 milliseconds
    99.81% <= 6 milliseconds
    100.00% <= 6 milliseconds
    45454.55 requests per second
    
    ====== SPOP ======
      10000 requests completed in 0.22 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    91.88% <= 1 milliseconds
    98.64% <= 2 milliseconds
    99.09% <= 3 milliseconds
    99.40% <= 4 milliseconds
    99.48% <= 5 milliseconds
    99.60% <= 6 milliseconds
    99.98% <= 11 milliseconds
    100.00% <= 11 milliseconds
    46296.30 requests per second
    
    ====== LPUSH (again, in order to bench LRANGE) ======
      10000 requests completed in 0.23 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    91.00% <= 1 milliseconds
    97.82% <= 2 milliseconds
    99.01% <= 3 milliseconds
    99.56% <= 4 milliseconds
    99.73% <= 5 milliseconds
    99.77% <= 7 milliseconds
    100.00% <= 7 milliseconds
    44247.79 requests per second
    
    ====== LRANGE (first 100 elements) ======
      10000 requests completed in 0.39 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    6.24% <= 1 milliseconds
    75.78% <= 2 milliseconds
    93.69% <= 3 milliseconds
    97.29% <= 4 milliseconds
    98.74% <= 5 milliseconds
    99.45% <= 6 milliseconds
    99.52% <= 7 milliseconds
    99.93% <= 8 milliseconds
    100.00% <= 8 milliseconds
    25906.74 requests per second
    
    ====== LRANGE (first 300 elements) ======
      10000 requests completed in 0.78 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    1.30% <= 1 milliseconds
    5.07% <= 2 milliseconds
    36.42% <= 3 milliseconds
    72.75% <= 4 milliseconds
    93.26% <= 5 milliseconds
    97.36% <= 6 milliseconds
    98.72% <= 7 milliseconds
    99.35% <= 8 milliseconds
    100.00% <= 8 milliseconds
    12886.60 requests per second
    
    ====== LRANGE (first 450 elements) ======
      10000 requests completed in 1.10 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    0.67% <= 1 milliseconds
    3.64% <= 2 milliseconds
    8.01% <= 3 milliseconds
    23.59% <= 4 milliseconds
    56.69% <= 5 milliseconds
    76.34% <= 6 milliseconds
    90.00% <= 7 milliseconds
    96.92% <= 8 milliseconds
    98.55% <= 9 milliseconds
    99.06% <= 10 milliseconds
    99.53% <= 11 milliseconds
    100.00% <= 11 milliseconds
    9066.18 requests per second
    
    ====== LRANGE (first 600 elements) ======
      10000 requests completed in 1.48 seconds
      50 parallel clients
      3 bytes payload
      keep alive: 1
    
    0.85% <= 1 milliseconds
    9.23% <= 2 milliseconds
    11.03% <= 3 milliseconds
    15.94% <= 4 milliseconds
    27.55% <= 5 milliseconds
    41.10% <= 6 milliseconds
    56.23% <= 7 milliseconds
    78.41% <= 8 milliseconds
    87.37% <= 9 milliseconds
    92.81% <= 10 milliseconds
    95.10% <= 11 milliseconds
    97.03% <= 12 milliseconds
    98.46% <= 13 milliseconds
    99.05% <= 14 milliseconds
    99.37% <= 15 milliseconds
    99.40% <= 17 milliseconds
    99.67% <= 18 milliseconds
    99.81% <= 19 milliseconds
    99.97% <= 20 milliseconds
    100.00% <= 20 milliseconds
    6752.19 requests per second
    

    正如你可以从我的简单笔记本电脑基准测试中看到的那样,你可能只需要一个消息队列,因为redis可以在0.23秒内处理10000 lpush个请求,在0.21秒内处理10000个lpop个请求 . 当你只需要一个队列时,我相信你的问题不再是一个问题(或者是 生产环境 者的产品重复,我完全不理解?) .

    > And it needs to be durable, so the MQs
    > are configured to persist to disk.
    

    redis也坚持光盘 .

    > The problem is that often these
    > objects are duplicated. They do have
    > 10-byte unique ids. It's not
    > catastrophic if objects are duplicated
    > in the queue, but it is if they're
    > duplicated in the processing after
    > being taken from the queue. What's the
    > best way to go about ensuring as close
    > as possible to linear scalability
    > whilst ensuring there's no duplication
    > in the processing of the objects?
    

    使用单个消息队列(框)时,如果我理解正确,则此问题不存在 . 但如果不是你可以只是检查id is member of your set ids . 当你处理id时,你应该remove it from the set ids . 首先,您应该使用sadd将成员添加到列表中 .

    如果一个盒子不再缩放,你应该将你的键分成多个盒子并检查那个盒子上的那个键 . 要了解更多相关信息,我认为您应该阅读以下链接:

    也许与此相关联,是应该将整个对象存储在消息队列中,还是只将id存储在像cassandra这样的内容中?

    如果可能的话,你应该将所有信息直接存储到内存中,因为没有什么能像内存一样快速运行(好吧你的cache内存更快但实际上非常小,而且你无法通过你的代码访问它) . Redis会将您的所有信息存储在内存中,并将快照存储到光盘中 . 我认为你应该能够将所有信息存储在内存中,并完全跳过像Cassandra这样的东西 .

    让我们考虑每个对象总共400个字节,速率为10000每秒=> 4000000每秒所有对象的字节数= = 4 MB / s,如果我的计算是正确的 . 您可以轻松地将大量信息存储在内存中 . 如果你不能,你应该真的考虑升级你的记忆,如果可能的话,因为内存不再昂贵了 .

  • 1

    如果您不介意将Camel投入混音,那么您可以使用idempotent-consumer EIP来帮助解决这个问题 .

    此外,ActiveMQ Message Groups可用于对相关消息进行分组,使它们更容易执行重复检查,并仍然保持高吞吐量等...

相关问题