我在让RabbitMQ队列过期时遇到问题 .
我正在使用RabbitMQ 3.2.4;服务器在Windows上运行,我的客户端代码在C#中 .
我尝试通过 x-expire
设置队列的自动删除参数和queue's TTL值 . 我尝试使用虚拟消息启动队列以伪造消费者的外观,我甚至尝试改变交换的持久性参数 .
在五种可能的组合中,它们都不会导致队列被删除 . 在最后一次连接关闭后,我等了几个小时(几天?),但队列不会消失 .
| auto-delete | x-expires | Prime |
| ----------- | --------- | ----- |
| false | false | false | // Don't care; no delete possible
| false | false | true | // Don't care; no delete possible
| false | true | false |
| false | true | true |
| true | false | false | // Fails consumer requirement
| true | false | true |
| true | true | false |
| true | true | true |
除非队列至少有一个使用者,否则队列不会自动删除,否则队列可能会在声明后立即自动删除 .
我做错了什么/我需要做什么才能让队列过期并被删除?
如果我提出XY问题,这就是我想要做的 . 我们有一个包含工作任务的主队列 . 我们有大量 Worker 会完成任务;运行并保存计算;然后重复那个循环 . 工作人员偶尔会完成任务但不会完成任务 . 我们需要在一段时间后将任务重新排队,以便另一名工作人员可以尝试对其进行处理 .
我已经看到了许多网站/博客/建议使用重试队列以及RabbitMQ的dead-letter-exchange功能以重新排队任务 . 简化的工作流程是:
拉任务
将任务的副本推送到重试队列
执行工作
从重试队列中拉出任务副本以防止重新排队 .
如果工作人员无法提取副本,则副本将过期并重定向回主工作队列 .
整体方法有效,但问题是它会创建大量空重试队列 . 我希望删除那些重试队列 .
相关代码片段 .
我可以提供引导代码,如果's relevant but it'只是 BasicPublish
后跟 BasicGet
private ConnectionFactory factory;
private IConnection connection;
private IModel channel;
private static string MainExchange = "MainExchange";
private static string RetryExchange = "RetryExchange";
private static string MainQueue = "MainQueue";
private static int messageRequeueTTL = 30000;
private static int requeueQueueTTL = messageRequeueTTL + 15000;
factory = new ConnectionFactory() { ... }
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(MainExchange, ExchangeType.Topic, true);
channel.ExchangeDeclare(RetryExchange, ExchangeType.Headers, false);
channel.QueueDeclare(MainQueue, true, false, false, null);
channel.QueueBind(MainQueue, MainExchange, "");
// Populate MainQueue with several calls of: channel.BasicPublish(MainExchange, "", null, body);
// ...
// Pull a message
BasicGetResult result = channel.BasicGet(MainQueue, false);
// Logic for requeueing; Foo is my work task class
string retryQueue = CreateRequeueName(foo.ID);
Dictionary<string, object> queueArgs = new Dictionary<string, object>
{
{"x-dead-letter-exchange", MainExchange}
,{"x-message-ttl", messageRequeueTTL}
};
Dictionary<string, object> bindArgs = new Dictionary<string, object>
{
{"x-match", "all"}
,{"key1", foo.ID}
,{"x-expires", requeueQueueTTL}
};
// Set auto delete or not here
channel.QueueDeclare(retryQueue, false, false, false, queueArgs);
channel.QueueBind(retryQueue, RetryExchange, "", bindArgs);
PrimeRetryQueue(foo.ID);
var body = Encoding.UTF8.GetBytes(Foo.ToXML(foo));
var props = channel.CreateBasicProperties();
props.Headers = new Dictionary<string, object>() { { "key1", foo.ID } };
channel.BasicPublish(RetryExchange, "", props, body);
//Acknowledge original message pulled from MainQueue
channel.BasicAck(result.DeliveryTag, false);
1 回答
不会删除“MainQueue”队列,因为您正在设置第4个参数autoDelete = false . 这是方法签名:
如果要在关闭连接时删除队列,则需要执行以下操作:
在第二种情况下,使用retryQueue,您将使用"x-message-ttl"来声明它,它控制消息过期,而不是队列过期 . 发送到该队列的消息应在30秒后过期,但队列将保留 . 您还在队列绑定参数中传递"x-expires",而AFAIK在那里没有效果 . 如果您希望队列本身在30秒后过期,则应在代码中的队列声明参数queueArgs中设置此值 .
供您参考:https://www.rabbitmq.com/ttl.html