首页 文章

两个JMS代理之间的XA事务(ActiveMQ)

提问于
浏览
0

我试图在2个不同的,远程的,activeMQ代理之间移动jms消息,经过大量的阅读

我正在使用Atomikos,因为我正在编写一个独立的应用程序,而且我也使用spring来完成所有工作 .

我有以下bean javaconfig设置

@Bean(name="atomikosSrcConnectionFactory")
    public AtomikosConnectionFactoryBean consumerXAConnectionFactory() {
        AtomikosConnectionFactoryBean consumerBean = new AtomikosConnectionFactoryBean();
        consumerBean.setUniqueResourceName("atomikosSrcConnectionFactory");
        consumerBean.setLocalTransactionMode(false);
        return consumerBean;
    }

    @Bean(name="atomikosDstConnectionFactory")
    public AtomikosConnectionFactoryBean producerXAConnectionFactory() {
        AtomikosConnectionFactoryBean producerBean = new AtomikosConnectionFactoryBean();
        producerBean.setUniqueResourceName("atomikosDstConnectionFactory");
        producerBean.setLocalTransactionMode(false);
        return producerBean;
    }

    @Bean(name="jtaTransactionManager")
    public JtaTransactionManager jtaTransactionManager() throws SystemException {
        JtaTransactionManager jtaTM = new JtaTransactionManager();
        jtaTM.setTransactionManager(userTransactionManager());
        jtaTM.setUserTransaction(userTransactionImp());
        return jtaTM;
    }

    @Bean(initMethod="init", destroyMethod="close", name="userTransactionManager")
    public UserTransactionManager userTransactionManager() {
        UserTransactionManager utm = new UserTransactionManager();
        utm.setForceShutdown(false);
        return utm;
    }

    @Bean(name="userTransactionImp")
    public UserTransactionImp userTransactionImp() throws SystemException {
        UserTransactionImp uti = new UserTransactionImp();
        uti.setTransactionTimeout(300);
        return uti;
    }

    @Bean(name="jmsContainer")
    @Lazy(value=true)
    public DefaultMessageListenerContainer jmsContainer() throws SystemException {
        DefaultMessageListenerContainer dmlc = new DefaultMessageListenerContainer();
        dmlc.setAutoStartup(false);
        dmlc.setTransactionManager(jtaTransactionManager());
        dmlc.setSessionTransacted(true);
        dmlc.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
        dmlc.setConnectionFactory(consumerXAConnectionFactory());
        dmlc.setDestinationName("srcQueue");
        return dmlc;
    }

    @Bean(name="transactedJmsTemplate")
    public JmsTemplate transactedJmsTemplate() {

        DynamicDestinationResolver dest = new DynamicDestinationResolver();

        JmsTemplate jmsTmp = new JmsTemplate(producerXAConnectionFactory());

        jmsTmp.setDeliveryPersistent(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDestinationResolver(dest);
        jmsTmp.setPubSubDomain(false);
        jmsTmp.setReceiveTimeout(20000);
        jmsTmp.setExplicitQosEnabled(true);
        jmsTmp.setSessionTransacted(true);
        jmsTmp.setDefaultDestination(new ActiveMQQueue("destQueue"));
        jmsTmp.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);

        return jmsTmp;
    }

在启动DMLC之前,2 AtomikosConnectionFactoryBean在运行时包装ActiveMQXAConnectionFactory(每个代理一个) .

然后我设置了一个简单的messageListener(在启动之前分配给dmlc),使用以下方法:

public void onMessage(Message message) {
    final Message rcvedMsg = message;

    try{
        MessageCreator msgCreator = new MessageCreator(){
                public Message createMessage(Session session) throws JMSException{
                    Message returnMsg = null;
                    if(rcvedMsg instanceof TextMessage){
                        TextMessage txtMsg = session.createTextMessage();
                        txtMsg.setText(((TextMessage) rcvedMsg).getText());
                        returnMsg = txtMsg;
                    }
                    else if(rcvedMsg instanceof BytesMessage){
                        BytesMessage bytesMsg = session.createBytesMessage();
                        if(!(((BytesMessage) rcvedMsg).getBodyLength() > Integer.MAX_VALUE)){
                            byte[] bodyContent = new byte[(int) ((BytesMessage) rcvedMsg).getBodyLength()];
                            bytesMsg.writeBytes(bodyContent);
                            returnMsg = bytesMsg;
                        }
                    }
                    return returnMsg;
                }
            };

            jmsTemplate.send(msgCreator);
    }
    catch(JmsException | JMSException e){
        logger.error("Error when transfering message: '{}'. {}",message,e);
    }
}

应用程序启动时没有任何特定的错误或警告但是只要我在源队列中放入一条消息,我就可以通过日志看到onMessage方法正在反复运行同一条消息,就好像事务一直存在回滚并重新启动(任何地方都没有错误) .

我还注意到,如果我碰巧使用相同的源和目标URL(意思是相同的代理,但每个代理都有自己的connectionFactory),它的工作原理和消息在源队列和目标队列之间按预期传输 .

我想知道的是

  • 我在设置中做错了什么?为什么我的事务"seemingly"在使用2个不同的代理时反复回滚,但在使用相同的(但超过2个不同的连接工厂)时工作?

  • 我不完全相信onMessage当前正在进行正确的事务,因为我正在捕获所有异常并且什么也不做,我相信这将在jmstemplate完成发送消息之前提交dmlc的事务,但我不确定 . 如果是这种情况,SessionAwareMessageListener会更好吗?我应该在onMessage方法中设置@Transacted吗?

有人可以帮助阐明这个问题吗?欢迎所有输入 .

更新:

我意识到“回滚”的问题是由于我使用的两个AMQ都是通过经纪人网络相互连接的事实,我碰巧使用相同的队列名称来源和目的地 . 这导致了这样的事实:应用程序将消息从一个AMQ转移到另一个AMQ然后立即转发,因为源AMQ上有消费者,消息将被转移回原始AMQ,而AMQ又被视为我的应用程序的新消息再次转移,循环无限循环 . 下面发布的解决方案有助于解决其他问题

1 回答

  • 0
    try {
       ... Code
    } catch (JmsException je) {
        logger.error("Error when transfering message: '{}'. {}",message,e);
    }
    

    上面的代码吞噬了异常,您应该不捕获异常或重新抛出,以便事务管理可以适当地处理它 . 目前没有看到异常,执行提交会导致奇怪的结果 .

    我会做类似下面的事情, JmsException 来自Spring,并且,正如Spring中的大多数例外, RuntimeException . 简单地说,还要记录异常堆栈跟踪,正确删除日志语句中的第二个 {} .

    try {
       ... Code
    } catch (JmsException je) {
        logger.error("Error when transfering message: '{}'.",message,e);
        throw je;
    }
    

    但是,这将复制日志记录,因为Spring也会记录错误 .

    对于 JMSException 做这样的事情,将其转换为 JmsException .

    try {
       ... Code
    } catch (JMSException je) {
        logger.error("Error when transfering message: '{}'.",message,e);
        throw JmsUtils.convertJmsAccessException(je);
    }
    

    要获得有关所发生情况的更多信息,您可能希望为 org.springframework.jms 包启用DEBUG日志记录 . 这将使您深入了解发送/接收消息时发生的情况 .

    您使用事务会话和手动确认消息的另一件事,但是您不在代码中执行 message.acknowledge() . 由于JTA事务,Spring不会调用它 . 尝试将其切换为 SESSION_TRANSACTED . 至少对于 DMLC .

相关问题