我正在使用 grails 3.2.3
版本和 rabbitmq native plugin 3.3.2
(http://budjb.github.io/grails-rabbitmq-native/doc/manual/) . 我试图实现以下方案 .
Description: 我_166296_在兔子的启动器上 . 所以任何帮助/方向都非常感谢 . 以下是我的代码 .
Queue config in application.groovy:
rabbitmq {
queues = [
[
name : "mail.queue",
connection: "defaultConnection",
durable : true
]
]
}
Sending to queue function:
protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
rabbitMessagePublisher.send {
routingKey = queueType.queueName
body = message
autoConvert = true
if (headers != null) {
headers = binding
}
}
}
在 sendToQueue
,我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;
Calling send to queue:
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])
Consumer 1:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetEmailConsumer consumer")
println(message)
passwordResetEmailService.sendPasswordResetMail(message)
}
Consumer 2:
static rabbitConfig = [
queue : QueueType.EMAIL_QUEUE.queueName,
binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
match : "all",
consumer: 10
]
def handleMessage(Map message, MessageContext context) {
print("From PasswordResetSuccessEmailConsumer consumer")
println(message)
passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}
1 回答
在阅读了rabbitmq文档之后,我意识到不可能有选择地从单个队列中提取消息 .
虽然还有另一个选项
"Exchange"
,其中发布者将发布消息以与路由密钥交换,并且这些消息将被传递到绑定队列 . 更多:RabbitMQ Publish/Subscribe Model基本思路也在这里描述:Stackoverflow: RabbitMQ selectively retrieving messages from a queue
无论如何,在我的解决方案中,我不想要多个队列 . 所以我创建了一个使用者并使用消息传递实际的处理程序类bean引用来分派消息 . 分享实施,希望这有助于某人:
Queue config in application.groovy:
Sending to queue function:
Handler Interface:
Sending to queue:
Queue Consumer:
Finally Handler service which implements Handler interface: