首页 文章

Grails rabbitmq本地过滤消息给消费者

提问于
浏览
2

我正在使用 grails 3.2.3 版本和 rabbitmq native plugin 3.3.2http://budjb.github.io/grails-rabbitmq-native/doc/manual/) . 我试图实现以下方案 .
enter image description here

Description: 我_166296_在兔子的启动器上 . 所以任何帮助/方向都非常感谢 . 以下是我的代码 .

Queue config in application.groovy:

rabbitmq {
    queues = [
        [
                name      : "mail.queue",
                connection: "defaultConnection",
                durable   : true
        ]
]

}

Sending to queue function:

protected void sendToQueue(QueueType queueType, Map message, Map<String, String> binding = null) {
    rabbitMessagePublisher.send {
        routingKey = queueType.queueName
        body = message
        autoConvert = true
        if (headers != null) {
            headers = binding
        }
    }
}

sendToQueue ,我将第三个参数设为可选,因为在某些情况下我不需要多种类型的消费者;

Calling send to queue:

sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET.name()])
sendToQueue(QueueType.EMAIL_QUEUE, [user: user], ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()])

Consumer 1:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetEmailConsumer consumer")
    println(message)
    passwordResetEmailService.sendPasswordResetMail(message)
}

Consumer 2:

static rabbitConfig = [
        queue   : QueueType.EMAIL_QUEUE.queueName,
        binding : ["emailType": EmailType.PASSWORD_RESET_SUCCESS.name()],
        match   : "all",
        consumer: 10
]

def handleMessage(Map message, MessageContext context) {
    print("From PasswordResetSuccessEmailConsumer consumer")
    println(message)
    passwordResetSuccessEmailService.sendPasswordResetSuccessMail(message)
}

1 回答

  • 2

    在阅读了rabbitmq文档之后,我意识到不可能有选择地从单个队列中提取消息 .

    Consumer从队列中接收所有消息

    虽然还有另一个选项 "Exchange" ,其中发布者将发布消息以与路由密钥交换,并且这些消息将被传递到绑定队列 . 更多:RabbitMQ Publish/Subscribe Model
    基本思路也在这里描述:Stackoverflow: RabbitMQ selectively retrieving messages from a queue
    无论如何,在我的解决方案中,我不想要多个队列 . 所以我创建了一个使用者并使用消息传递实际的处理程序类bean引用来分派消息 . 分享实施,希望这有助于某人:

    Queue config in application.groovy:

    rabbitmq {
        queues = [
            [
                    name      : "mail.queue",
                    connection: "defaultConnection",
                    durable   : true
            ]
        ]
    }
    

    Sending to queue function:

    protected void sendToQueue(Map message, QueueType queueType, Class<BaseQueueHandler> queueHandlerServiceClass) {
        message.queueHandlerServiceClass = queueHandlerServiceClass.name
        rabbitMessagePublisher.send {
            routingKey = queueType.queueName // queue name from enum: "mail.queue"
            body = message
            autoConvert = true
        }
    }
    

    Handler Interface:

    interface BaseQueueHandler {
        void handleMessage(Map message, MessageContext context)
    }
    

    Sending to queue:

    sendToQueue([user: user], QueueType.EMAIL_QUEUE, PasswordResetEmailService.class)
    

    Queue Consumer:

    class EchoEmailQueueConsumer {
    
        static rabbitConfig = [
                queue   : QueueType.ECHO_EMAIL_QUEUE.queueName,
                consumer: 10
        ]
    
        GrailsApplication grailsApplication
    
        def handleMessage(Map message, MessageContext context) {
            String handlerClass = message.remove("queueHandlerServiceClass")
            Class<BaseQueueHandler> handlerClassType = Class.forName(handlerClass);
            BaseQueueHandler queueService = grailsApplication.mainContext.getBean(handlerClassType)
            queueService.handleMessage(message, context)
        }
    
    }
    

    Finally Handler service which implements Handler interface:

    class PasswordResetEmailService implements BaseQueueHandler {
    
        @Override
        void handleMessage(Map message, MessageContext context) {
            println("message received in PasswordResetEmailService")
        }
    }
    

相关问题