首页 文章

使用spring-websocket和rabbitmq-stomp时,不会向所有活动订阅者发送消息

提问于
浏览
0

我有一个基于 spring 腹架的网络应用程序(由 spring 靴1.5.1提供动力) . 而我正在使用带有stomp插件的Rabbitmq(3.6.6)作为全功能代理 .

根据the doc of stomp,来自/ topic /的目的地消息将被发送给所有活动订户 .

主题目标对于将每封邮件的副本传递给所有活动订阅者的简单主题目标,可以使用表单/ topic /的目标 . 主题目标支持AMQP主题交换的所有路由模式 . 发送到没有活动订户的主题目标的消息将被丢弃 .

但行为是 NOT 与我的应用程序中的上述声明一致!

我在两个浏览器中打开了同一页面 . 因此,有两个客户端连接到websocket服务器 . 他们都订阅了以 /topic/ 开头的相同目的地 .

我将消息发送到目标 /topic/<route key> 后,但只有一个客户端会收到该消息 . 两个客户端将轮流接收来自同一目的地的消息 .

在我的Spring服务器端应用程序中,我将消息发送到目的地,如下所示,

@Secured(User.ROLE_USER)
@MessageMapping("/comment/{liveid}")
@SendTo("/topic/comment-{liveid}")
public CommentMessage userComment(@DestinationVariable("liveid") String liveid,
                                     @AuthenticationPrincipal UserDetails activeUser, UserComment userComment) {
    logger.debug("Receiving comment message '{}' of live '{}' from user '{}'.",
            userComment,liveid, activeUser.getUsername());
    final User user = userService.findByUsername(activeUser.getUsername()).get();
    return CommentMessage.builder().content(userComment.getContent()).sender(user.getNickname())
            .senderAvatar(user.getAvatar()).build();
}

在我的客户端,它订阅了如下的持久主题,

$stomp.subscribe('/topic/comment-' + $scope.lives[i].id, function(payload, headers, res) {
                                // do something
                            }, {
                                'durable': true,
                                'auto-delete': false
                            });

以下是我 Spring 季应用中websocket的配置,

@Configuration
@EnableWebSocketMessageBroker

public class WebSocketConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<ExpiringSession> {

@Value("${stompBroker.host:localhost}")
String brokerHost;
@Value("${stompBroker.port:61613}")
int brokerPort;
@Value("${stompBroker.login:guest}")
String brokerLogin;
@Value("${stompBroker.passcode:guest}")
String brokerPasscode;
@Value("${stompBroker.vhost:myvhost}")
String brokerVHost;

@Override
protected void configureStompEndpoints(StompEndpointRegistry registry) {
    registry.addEndpoint("/live/ws").withSockJS();
}

@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
    registry.enableStompBrokerRelay("/topic/").setRelayHost(brokerHost).setRelayPort(
            brokerPort).setSystemLogin(brokerLogin).setSystemPasscode(brokerPasscode).setVirtualHost(brokerVHost);

    /**
     * Both of two subscribers can receive the message if using simple broker 
    registry.enableSimpleBroker("/topic/");
     */
    registry.setApplicationDestinationPrefixes("/app");
}

@Configuration
public class WebSocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
    @Override
    protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
        messages.simpDestMatchers("/app/*").hasRole("USER");
    }

    @Override
    protected boolean sameOriginDisabled() {
        return true;
    }
}
}

我配置RabbitMQ和Stomp插件有什么问题吗?使用 SimpleMessageBroker 而不是RabbitMQ时效果很好 .

1 回答

  • 0

    通过rabbimq-users group中的讨论解决了这个问题 .

    我使用了持久订阅,具有相同的ID,消费者成为有竞争力的消费者 .

    在使用持久队列或使用自动删除队列时指定不同的客户端ID解决了该问题 .

相关问题