首页 文章

spring配置嵌入式BrokerService

提问于
浏览
4

我想在spring mvc应用程序中配置嵌入式ActiveMQ代理服务 .

这是我的配置

@Configuration
@EnableJms
public class JmsConfiguration {

@Bean(initMethod = "start", destroyMethod = "stop")
public BrokerService brokerService() throws Exception {
    BrokerService brokerService = new BrokerService();
    brokerService.setPersistent(false);
    brokerService.setUseJmx(false);
    brokerService.addConnector("vm://localhost:0");
    brokerService.setBrokerName("broker");
    brokerService.setUseShutdownHook(false);
    return brokerService;
}
@Bean
public ConnectionFactory connectionFactory(){
    return new ActiveMQConnectionFactory("vm://localhost:0");
}

@Bean
public ActiveMQQueue defaultDestination(){
    return new ActiveMQQueue("broker");
}

@Bean
public JmsTemplate jmsTemplate(){
    JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
    jmsTemplate.setDefaultDestination(defaultDestination());
    return jmsTemplate;
}
}

和简单测试获取请求发送消息给代理服务私有JmsTemplate jmsTemplate;

@Autowired
public TestController(JmsTemplate jmsTemplate){
    this.jmsTemplate = jmsTemplate;
}

@RequestMapping(value="/test/jms", method = RequestMethod.GET)
public String SendJMSMsg(Locale locale){
    try {
        jmsTemplate.convertAndSend("test");
        return "OK";
    }catch (Exception e){
        e.printStackTrace();
        return e.getMessage();

    }
}

jmsTemplate.convertAndSend抛出此异常

org.springframework.jms.UncategorizedJmsException: Uncategorized exception occured during JMS processing; nested exception is javax.jms.JMSException: org.apache.activemq.advisory.AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(Lorg/apache/activemq/command/ActiveMQDestination;)Z
    at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
    at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:497)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:569)
    at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:560)
    at si.lapps.fabijan.controller.AdminController.SendJMSMsg(AdminController.java:41)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:221)
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136)
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:110)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:817)
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:731)
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:85)
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:959)
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:893)
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:968)
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:859)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:621)
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:844)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:728)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:305)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210)
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:51)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:243)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:210)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:222)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:123)
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:502)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:171)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:100)
    at org.apache.catalina.valves.AccessLogValve.invoke(AccessLogValve.java:953)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:118)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:408)
    at org.apache.coyote.http11.AbstractHttp11Processor.process(AbstractHttp11Processor.java:1041)
    at org.apache.coyote.AbstractProtocol$AbstractConnectionHandler.process(AbstractProtocol.java:603)
    at org.apache.tomcat.util.net.JIoEndpoint$SocketProcessor.run(JIoEndpoint.java:310)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: javax.jms.JMSException: org.apache.activemq.advisory.AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(Lorg/apache/activemq/command/ActiveMQDestination;)Z
    at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:54)
    at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1401)
    at org.apache.activemq.AdvisoryConsumer.<init>(AdvisoryConsumer.java:51)
    at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1513)
    at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:324)
    at org.springframework.jms.support.JmsAccessor.createSession(JmsAccessor.java:192)
    at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:485)
    ... 39 more
Caused by: java.lang.NoSuchMethodError: org.apache.activemq.advisory.AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic(Lorg/apache/activemq/command/ActiveMQDestination;)Z
    at org.apache.activemq.advisory.AdvisoryBroker.addConsumer(AdvisoryBroker.java:205)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:103)
    at org.apache.activemq.broker.BrokerFilter.addConsumer(BrokerFilter.java:103)
    at org.apache.activemq.broker.MutableBrokerFilter.addConsumer(MutableBrokerFilter.java:108)
    at org.apache.activemq.broker.TransportConnection.processAddConsumer(TransportConnection.java:671)
    at org.apache.activemq.command.ConsumerInfo.visit(ConsumerInfo.java:351)
    at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:338)
    at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:188)
    at org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
    at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
    at org.apache.activemq.transport.vm.VMTransport.iterate(VMTransport.java:271)
    at org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:133)
    at org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:48)

有人可以帮我解决这个问题吗?如果我使用外部ActiveMQ工作都很好 .

2 回答

  • 1

    我遇到了这个问题,结果发现其中一个activemq jar中的依赖管理强制 activemq-client 下降到一个不包含 AdvisorySupport.isVirtualDestinationConsumerAdvisoryTopic 的版本 . 在我的情况下,所需的 5.13.2 被降级为 5.12.2 .

    我的 build.gradle 包含了这个:

    compile 'org.apache.activemq:activemq-broker:5.13.2'
    compile 'org.apache.activemq:activemq-stomp:5.13.2'
    

    和依赖是:

    +--- org.apache.activemq:activemq-broker:5.13.2
    |    +--- org.apache.activemq:activemq-client:5.13.2 -> 5.12.2
    |    |    +--- org.slf4j:slf4j-api:1.7.10 -> 1.7.13
    |    |    +--- org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1
    |    |    +--- org.fusesource.hawtbuf:hawtbuf:1.11
    |    |    \--- org.apache.geronimo.specs:geronimo-j2ee-management_1.1_spec:1.0.1
    |    \--- org.apache.activemq:activemq-openwire-legacy:5.13.2 -> 5.12.2
    |         \--- org.apache.activemq:activemq-client:5.12.2 (*)
    \--- org.apache.activemq:activemq-stomp:5.13.2
         \--- org.apache.activemq:activemq-broker:5.13.2 (*)
    

    为我修复的是在 build.gradle 中指定activemq客户端版本:

    compile 'org.apache.activemq:activemq-broker:5.13.2'
    compile 'org.apache.activemq:activemq-client:5.13.2'
    compile 'org.apache.activemq:activemq-stomp:5.13.2'
    

    依赖关系现在看起来像:

    +--- org.apache.activemq:activemq-broker:5.13.2
    |    +--- org.apache.activemq:activemq-client:5.13.2
    |    |    +--- org.slf4j:slf4j-api:1.7.13
    |    |    +--- org.apache.geronimo.specs:geronimo-jms_1.1_spec:1.1.1
    |    |    +--- org.fusesource.hawtbuf:hawtbuf:1.11
    |    |    \--- org.apache.geronimo.specs:geronimo-j2ee-management_1.1_spec:1.0.1
    |    \--- org.apache.activemq:activemq-openwire-legacy:5.13.2 -> 5.12.2
    |         \--- org.apache.activemq:activemq-client:5.12.2 -> 5.13.2 (*)
    +--- org.apache.activemq:activemq-client:5.13.2 (*)
    \--- org.apache.activemq:activemq-stomp:5.13.2
         \--- org.apache.activemq:activemq-broker:5.13.2 (*)
    

    并且错误不再发生 .

    Important note :activemq项目所有者将完成该依赖项管理的原因(我怀疑它与 org.apache.activemq:activemq-openwire-legacy 项目有关),因此请准备好修复出现的任何兼容性问题 .

  • 4

    Adrey Brown的答案解决了异常的原始问题,但我的听众没有收到任何消息 . 我的配置有什么问题

    这是我的bean配置

    @Configuration
    @EnableJms
    @ComponentScan("si.xxxx.yyyyy.jms")
    public class JmsConfiguration{
    
        @Bean//(initMethod = "start", destroyMethod = "stop")
        public BrokerService brokerService() throws Exception {
            BrokerService brokerService = new BrokerService();
            brokerService.setPersistent(false);
            brokerService.setUseJmx(false);
            //brokerService.addConnector("vm://localhost:0");
            brokerService.addConnector("tcp://localhost:61616");
            brokerService.setBrokerName("broker");
            brokerService.setUseShutdownHook(false);
            return brokerService;
        }
        @Bean
        public ConnectionFactory connectionFactory(){
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            factory.setTrustAllPackages(true);
            return factory;
        }
    
        @Bean
        public ActiveMQQueue defaultDestination(){
            return new ActiveMQQueue("email");
        }
    
        @Bean
        public JmsTemplate jmsTemplate(){
            JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
            jmsTemplate.setDefaultDestination(defaultDestination());
            return jmsTemplate;
        }
    
        @Bean
        public DefaultMessageListenerContainer jmsListenerContainerFactory(ConnectionFactory connectionFactory){
            DefaultMessageListenerContainer containerFactory = new DefaultMessageListenerContainer();
            containerFactory.setConnectionFactory(connectionFactory);
            containerFactory.setDestination(defaultDestination());
            containerFactory.setMessageListener(messageListener());
            return containerFactory;
        }
    
        @Bean
        public MessageListener messageListener(){
            return new MessageListener();
        }
    }
    

    寄件人

    @Service
    public class MessageSender {
    
        private JmsTemplate jmsTemplate;
    
        @Autowired
        public MessageSender(JmsTemplate jmsTemplate){
            this.jmsTemplate = jmsTemplate;
        }
    
        public void sendUserConfirmationEmail(NewUserMessage message){
            jmsTemplate.convertAndSend(message);
        }
    }
    

    和监听器@Service公共类MessageListener实现javax.jms.MessageListener {

    private static final Logger logger = LoggerFactory.getLogger(MessageListener.class);
    
        @Override
        public void onMessage(Message message) {
            ObjectMessage objectMessage = (ObjectMessage)message;
            try {
                if (objectMessage.getObject() instanceof NewUserMessage) {
                    NewUserMessage newUserMessage = (NewUserMessage) objectMessage.getObject();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    

相关问题