首页 文章

为什么我的兔子队伍不会死?

提问于
浏览
4

我在让RabbitMQ队列过期时遇到问题 .
我正在使用RabbitMQ 3.2.4;服务器在Windows上运行,我的客户端代码在C#中 .

我尝试通过 x-expire 设置队列的自动删除参数和queue's TTL值 . 我尝试使用虚拟消息启动队列以伪造消费者的外观,我甚至尝试改变交换的持久性参数 .

在五种可能的组合中,它们都不会导致队列被删除 . 在最后一次连接关闭后,我等了几个小时(几天?),但队列不会消失 .

| auto-delete | x-expires | Prime |
| ----------- | --------- | ----- |
| false       | false     | false |  // Don't care; no delete possible
| false       | false     | true  |  // Don't care; no delete possible
| false       | true      | false |
| false       | true      | true  |
| true        | false     | false |  // Fails consumer requirement
| true        | false     | true  |
| true        | true      | false |
| true        | true      | true  |

除非队列至少有一个使用者,否则队列不会自动删除,否则队列可能会在声明后立即自动删除 .

我做错了什么/我需要做什么才能让队列过期并被删除?


如果我提出XY问题,这就是我想要做的 . 我们有一个包含工作任务的主队列 . 我们有大量 Worker 会完成任务;运行并保存计算;然后重复那个循环 . 工作人员偶尔会完成任务但不会完成任务 . 我们需要在一段时间后将任务重新排队,以便另一名工作人员可以尝试对其进行处理 .

我已经看到了许多网站/博客/建议使用重试队列以及RabbitMQ的dead-letter-exchange功能以重新排队任务 . 简化的工作流程是:
拉任务
将任务的副本推送到重试队列
执行工作
从重试队列中拉出任务副本以防止重新排队 .

如果工作人员无法提取副本,则副本将过期并重定向回主工作队列 .

整体方法有效,但问题是它会创建大量空重试队列 . 我希望删除那些重试队列 .


相关代码片段 .
我可以提供引导代码,如果's relevant but it'只是 BasicPublish 后跟 BasicGet

private ConnectionFactory factory;
private IConnection connection;
private IModel channel;

private static string MainExchange = "MainExchange";
private static string RetryExchange = "RetryExchange";
private static string MainQueue = "MainQueue";

private static int messageRequeueTTL = 30000;
private static int requeueQueueTTL = messageRequeueTTL + 15000;

factory = new ConnectionFactory() { ... }

connection = factory.CreateConnection();
channel = connection.CreateModel();

channel.ExchangeDeclare(MainExchange, ExchangeType.Topic, true);
channel.ExchangeDeclare(RetryExchange, ExchangeType.Headers, false);

channel.QueueDeclare(MainQueue, true, false, false, null);
channel.QueueBind(MainQueue, MainExchange, "");

// Populate MainQueue with several calls of: channel.BasicPublish(MainExchange, "", null, body);
// ...

// Pull a message
BasicGetResult result = channel.BasicGet(MainQueue, false);

// Logic for requeueing; Foo is my work task class
    string retryQueue = CreateRequeueName(foo.ID);

    Dictionary<string, object> queueArgs = new Dictionary<string, object>
    {
        {"x-dead-letter-exchange", MainExchange}
        ,{"x-message-ttl", messageRequeueTTL} 
    };

    Dictionary<string, object> bindArgs = new Dictionary<string, object>
    {
        {"x-match", "all"}
        ,{"key1", foo.ID}
        ,{"x-expires", requeueQueueTTL}
    };

    // Set auto delete or not here
    channel.QueueDeclare(retryQueue, false, false, false, queueArgs);

    channel.QueueBind(retryQueue, RetryExchange, "", bindArgs);

    PrimeRetryQueue(foo.ID);        

    var body = Encoding.UTF8.GetBytes(Foo.ToXML(foo));
    var props = channel.CreateBasicProperties();
    props.Headers = new Dictionary<string, object>() { { "key1", foo.ID } };

    channel.BasicPublish(RetryExchange, "", props, body);

//Acknowledge original message pulled from MainQueue
channel.BasicAck(result.DeliveryTag, false);

1 回答

  • 3

    不会删除“MainQueue”队列,因为您正在设置第4个参数autoDelete = false . 这是方法签名:

    QueueDeclareOk QueueDeclare (string queue, bool durable, bool exclusive, bool autoDelete, IDictionary arguments);
    

    如果要在关闭连接时删除队列,则需要执行以下操作:

    channel.QueueDeclare(MainQueue, true, true, true, null);
    

    在第二种情况下,使用retryQueue,您将使用"x-message-ttl"来声明它,它控制消息过期,而不是队列过期 . 发送到该队列的消息应在30秒后过期,但队列将保留 . 您还在队列绑定参数中传递"x-expires",而AFAIK在那里没有效果 . 如果您希望队列本身在30秒后过期,则应在代码中的队列声明参数queueArgs中设置此值 .

    Dictionary<string, object> queueArgs = new Dictionary<string, object>
    {
        {"x-dead-letter-exchange", MainExchange},
        {"x-expires", messageRequeueTTL} 
    };
    
    channel.QueueDeclare(retryQueue, false, false, false, queueArgs);
    

    供您参考:https://www.rabbitmq.com/ttl.html

相关问题