首页 文章

如何使用RabbitTemplate模仿SimpMessagingTemplate.convertAndSendToUser?

提问于
浏览
1

所以我一直在阅读有关RabbitMQ代理的Spring Message Relay(Spring Messaging stuff)功能 . 我想要实现的目标如下:

有一个服务(1),它充当rabbitmq和浏览器之间的消息中继 . 这现在工作正常 . 我正在使用MessageBrokerRegistry.enableStompBrokerRelay来做到这一点 .

在后端有另一个服务(2),它将一条消息发送到RabbitMQ上的已知队列,并将该消息路由到特定用户 . 作为发件人,我希望能够控制邮件传递给谁 .

通常,您可以使用SimpMessagingTemplate来执行此操作 . 但问题是,消息的来源实际上并不能访问该模板,因为它不是作为中继,它不使用websockets而且它不保存队列名称到会话ID的映射 .

我能想到的一种方法是在服务1上编写一个简单的类,它将监听所有队列并使用simp模板转发它们 . 但是我觉得这不是理想的做法,我觉得可能已经有办法用Spring做了 .

你能给些建议么?

1 回答

  • 1

    This question让我想到了我面临的同样困境 . 我已经开始使用自定义UserDestinationResolver来达到一致的主题命名方案,该方案仅使用用户名,而不使用默认解析器使用的会话ID .

    这让我可以在JS中订阅“/user/exchange/amq.direct/current-time”,但是通过一个vanilla RabbitMQ应用程序发送到“/exchange/amqp.direct/users.me.current-time”(发送给一个名为的用户) “我”) .

    最新的源代码是here,我在现有的@Configuration类中"registering"它as a @Bean .

    这是自定义 UserDestinationResolver 本身:

    public class ConsistentUserDestinationResolver implements UserDestinationResolver {
        private static final Pattern USER_DEST_PREFIXING_PATTERN =
                Pattern.compile("/user/(?<name>.+?)/(?<routing>.+)/(?<dest>.+?)");
    
        private static final Pattern USER_AUTHENTICATED_PATTERN =
                Pattern.compile("/user/(?<routing>.*)/(?<dest>.+?)");
    
        @Override
        public UserDestinationResult resolveDestination(Message<?> message) {
            SimpMessageHeaderAccessor accessor = MessageHeaderAccessor.getAccessor(message, SimpMessageHeaderAccessor.class);
    
            final String destination = accessor.getDestination();
            final String authUser = accessor.getUser() != null ? accessor.getUser().getName() : null;
    
            if (destination != null) {
                if (SimpMessageType.SUBSCRIBE.equals(accessor.getMessageType()) ||
                        SimpMessageType.UNSUBSCRIBE.equals(accessor.getMessageType())) {
                    if (authUser != null) {
                        final Matcher authMatcher = USER_AUTHENTICATED_PATTERN.matcher(destination);
                        if (authMatcher.matches()) {
                            String result = String.format("/%s/users.%s.%s",
                                    authMatcher.group("routing"), authUser, authMatcher.group("dest"));
                            UserDestinationResult userDestinationResult =
                                    new UserDestinationResult(destination, Collections.singleton(result), result, authUser);
                            return userDestinationResult;
                        }
                    }
                }
                else if (accessor.getMessageType().equals(SimpMessageType.MESSAGE)) {
                    final Matcher prefixMatcher = USER_DEST_PREFIXING_PATTERN.matcher(destination);
                    if (prefixMatcher.matches()) {
                        String user = prefixMatcher.group("name");
                        String result = String.format("/%s/users.%s.%s",
                                prefixMatcher.group("routing"), user, prefixMatcher.group("dest"));
                        UserDestinationResult userDestinationResult =
                                new UserDestinationResult(destination, Collections.singleton(result), result, user);
                        return userDestinationResult;
                    }
                }
            }
    
            return null;
        }
    }
    

相关问题