首页 文章

JMS Poller Transactional

提问于
浏览
1

我正在使用Spring Integration 4.1.5并试图对事务做一些事情,但遗憾的是我无法找到工作示例 . 我正在尝试设置正在查找消息的JMS轮询器 . 收到消息后,服务激活器会在数据库中插入一行,并将消息传递给另一个服务激活器 . 我想制作前两个部分,消息提取和数据库插入事务 . 我不希望其余的流程是事务性的 . 我使用Weblogic作为应用程序容器,因此将使用WebLogicJtaTransactionManager .

我遇到的问题是我无法使前两件事交易 . 它要么全部要么全无 . 我尝试了很多方法,但我觉得在轮询器上使用建议链是最好的选择 . 我将能够控制哪些方法将成为事务的一部分 .

我见过使用消息驱动的监听器的例子,但是我使用的是Weblogic并且正在使用工作管理器,我相信我必须使用轮询器来利用工作管理器(如果不是这样的话,我猜这是另一个对未来的问题!)

我已经使用了xml并对其进行了简化,但除了编辑包名外,上下文也会产生问题 .

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
    xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/file 
    http://www.springframework.org/schema/integration/file/spring-integration-file.xsd 
    http://www.springframework.org/schema/integration/sftp 
    http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd 
    http://www.springframework.org/schema/integration/xml 
    http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
    http://www.springframework.org/schema/tx
    http://www.springframework.org/schema/tx/spring-tx.xsd
    http://www.springframework.org/schema/aop
    http://www.springframework.org/schema/aop/spring-aop.xsd">


    <bean id="jtaTransactionManager"
        class="org.springframework.transaction.jta.WebLogicJtaTransactionManager">
        <property name="transactionManagerName" value="javax.transaction.TransactionManager" />
    </bean>

    <bean id="insertMessageToDb" class="com.ReadMsgFromAxway" />
    <bean id="serviceActivator" class="com.CreateTMDFile" />

    <int-jms:inbound-channel-adapter id="jmsDefaultReceiver"
        connection-factory="inboundDefaultAdaptorConnectionFactory"
        extract-payload="false" destination="inboundAdaptorDefaultListenerQueue"
        channel="inboundJMS" acknowledge="transacted">  
        <int:poller id="poller" 
            max-messages-per-poll="100" fixed-rate="10">
            <int:advice-chain>
                <ref bean="txAdvice" />
            </int:advice-chain>
        </int:poller>
    </int-jms:inbound-channel-adapter>


    <tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
        <tx:attributes>
            <tx:method name="processMessage" propagation="REQUIRED"/>
        </tx:attributes>
    </tx:advice>

    <aop:config>
        <aop:pointcut id="txOperation" 
            expression="execution(* axway.ReadMsgFromAxway.processMessage(..))"/>
        <aop:advisor advice-ref="txAdvice" pointcut-ref="txOperation" />
    </aop:config> 

    <int:service-activator input-channel="inboundJMS"
        output-channel="serviceActivatorChannel" ref="insertMessageToDb" method="processMessage" />

    <int:chain input-channel="serviceActivatorChannel" output-channel="nullChannel">
        <int:service-activator ref="serviceActivator" />
    </int:chain>

</beans>

ReadMsgFromAxway.java

public Message<File> processMessage(Message<?> message)  {
//Insert into DB
        trackerProcess.insertUpdateMessageTracker(message, "axwayChannel",
                "axwayChannel", currentStatusID, null, null);
        count++;
        int mod = count % 2;
        if (mod != 0) {
            // pass every 2
            String hello = "hey";
        } else {
            throw new RuntimeException("Testing transactional");
        }

        Message<File> springMessage = MessageBuilder.createMessage(payloadFile,
                messageHeaders);
        return springMessage;
    }

无论是抛出运行时异常还是在下一个服务激活器组件抛出异常,XML都不会执行任何操作 .

如果我将建议属性更改为

<tx:method name="*" propagation="REQUIRED"/>

然后,第1和第2服务激活器的异常导致回滚 .

奇怪的是,如果我这样做

<tx:method name="processMessage" propagation="REQUIRED"/>
        <tx:method name="*" propagation="NEVER"/>

然后,是否抛出第一个服务激活器或第二个激活器中的运行时异常,消息将被回滚 . 我认为切入点会限制哪个类会导致交易,但我可能会误解某些事情 .

另一个注意事项 - 这个应用程序被安置在一个带有其他几个战争的耳朵文件中 . 此上下文启动整个入站进程,并通过JMS队列连接到包含业务逻辑的另一个war . 在使用方法name =“*”的场景中,我看到了业务逻辑战中的异常导致原始入站消息的JMS消息也被回滚 . 我的印象是第二次战争将在另一个线程中进行处理,因为它通过队列接收消息,因此不属于事务的一部分 . 这可能是JTA的副作用,它是容器管理的吗?

谢谢!

1 回答

  • 0

    我'd recommend you to read Dave Syer' s article关于交易,你可以在"More like this"找到一个链接 .

    现在看起来你根本不了解交易和AOP . 您应该在Spring Framework中更加关注AOP支持 .

    一般来说,声明式事务( @Transactional on method)是AOP建议的特例 . 任何AOP背后的主要概念是由方法调用产生的调用堆栈边界,我们为其指定了一个建议 .

    但是如果目标对象中没有这样的方法,它将不会被建议并包装到AOP代理 . 就像 processMessage 的情况一样, txAdviceorg.springframework.messaging.Message.MessageSource 周围的某个内部对象应用 JmsDestinationPollingSource 作为 <int-jms:inbound-channel-adapter> 的合约 .

    相反,您可以使用 <transactional><transactional> 配置 . 对:所有的下游流程都将由交易覆盖 .

    在这种情况下为该内部完成交易

    Callable<Boolean> pollingTask = new Callable<Boolean>() {
    
            @Override
            public Boolean call() throws Exception {
                return doPoll();
            }
        };
    

    receive()handleMessage() 周围,您应该像常规 @Transactional 情况一样完成方法调用 . 为此,我们应该将下一个消息处理到另一个线程 . 只是因为默认情况下所有 <channel> 都是 DirectChannel 并且与当前调用堆栈绑定 .

    为此,您可以使用 ExecutorChannel<int:dispatcher task-executor="threadPoolExecutor"/> )作为 serviceActivatorChannel .

    从那里你不需要其余的AOP配置 .

    不确定您的其他网络应用程序的第二个问题 . 看起来它有一些逻辑可以挽回它的工作,以防你身边出现一些不一致的情况 . 但无论如何,这看起来像一个不同的问题 .

相关问题