我正在使用Spring Integration 4.1.5并试图对事务做一些事情,但遗憾的是我无法找到工作示例 . 我正在尝试设置正在查找消息的JMS轮询器 . 收到消息后,服务激活器会在数据库中插入一行,并将消息传递给另一个服务激活器 . 我想制作前两个部分,消息提取和数据库插入事务 . 我不希望其余的流程是事务性的 . 我使用Weblogic作为应用程序容器,因此将使用WebLogicJtaTransactionManager .
我遇到的问题是我无法使前两件事交易 . 它要么全部要么全无 . 我尝试了很多方法,但我觉得在轮询器上使用建议链是最好的选择 . 我将能够控制哪些方法将成为事务的一部分 .
我见过使用消息驱动的监听器的例子,但是我使用的是Weblogic并且正在使用工作管理器,我相信我必须使用轮询器来利用工作管理器(如果不是这样的话,我猜这是另一个对未来的问题!)
我已经使用了xml并对其进行了简化,但除了编辑包名外,上下文也会产生问题 .
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/sftp
http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/xml
http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd">
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.WebLogicJtaTransactionManager">
<property name="transactionManagerName" value="javax.transaction.TransactionManager" />
</bean>
<bean id="insertMessageToDb" class="com.ReadMsgFromAxway" />
<bean id="serviceActivator" class="com.CreateTMDFile" />
<int-jms:inbound-channel-adapter id="jmsDefaultReceiver"
connection-factory="inboundDefaultAdaptorConnectionFactory"
extract-payload="false" destination="inboundAdaptorDefaultListenerQueue"
channel="inboundJMS" acknowledge="transacted">
<int:poller id="poller"
max-messages-per-poll="100" fixed-rate="10">
<int:advice-chain>
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int-jms:inbound-channel-adapter>
<tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
<tx:attributes>
<tx:method name="processMessage" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:pointcut id="txOperation"
expression="execution(* axway.ReadMsgFromAxway.processMessage(..))"/>
<aop:advisor advice-ref="txAdvice" pointcut-ref="txOperation" />
</aop:config>
<int:service-activator input-channel="inboundJMS"
output-channel="serviceActivatorChannel" ref="insertMessageToDb" method="processMessage" />
<int:chain input-channel="serviceActivatorChannel" output-channel="nullChannel">
<int:service-activator ref="serviceActivator" />
</int:chain>
</beans>
ReadMsgFromAxway.java
public Message<File> processMessage(Message<?> message) {
//Insert into DB
trackerProcess.insertUpdateMessageTracker(message, "axwayChannel",
"axwayChannel", currentStatusID, null, null);
count++;
int mod = count % 2;
if (mod != 0) {
// pass every 2
String hello = "hey";
} else {
throw new RuntimeException("Testing transactional");
}
Message<File> springMessage = MessageBuilder.createMessage(payloadFile,
messageHeaders);
return springMessage;
}
无论是抛出运行时异常还是在下一个服务激活器组件抛出异常,XML都不会执行任何操作 .
如果我将建议属性更改为
<tx:method name="*" propagation="REQUIRED"/>
然后,第1和第2服务激活器的异常导致回滚 .
奇怪的是,如果我这样做
<tx:method name="processMessage" propagation="REQUIRED"/>
<tx:method name="*" propagation="NEVER"/>
然后,是否抛出第一个服务激活器或第二个激活器中的运行时异常,消息将被回滚 . 我认为切入点会限制哪个类会导致交易,但我可能会误解某些事情 .
另一个注意事项 - 这个应用程序被安置在一个带有其他几个战争的耳朵文件中 . 此上下文启动整个入站进程,并通过JMS队列连接到包含业务逻辑的另一个war . 在使用方法name =“*”的场景中,我看到了业务逻辑战中的异常导致原始入站消息的JMS消息也被回滚 . 我的印象是第二次战争将在另一个线程中进行处理,因为它通过队列接收消息,因此不属于事务的一部分 . 这可能是JTA的副作用,它是容器管理的吗?
谢谢!
1 回答
我'd recommend you to read Dave Syer' s article关于交易,你可以在"More like this"找到一个链接 .
现在看起来你根本不了解交易和AOP . 您应该在Spring Framework中更加关注AOP支持 .
一般来说,声明式事务(
@Transactional
on method)是AOP建议的特例 . 任何AOP背后的主要概念是由方法调用产生的调用堆栈边界,我们为其指定了一个建议 .但是如果目标对象中没有这样的方法,它将不会被建议并包装到AOP代理 . 就像
processMessage
的情况一样,txAdvice
为org.springframework.messaging.Message.MessageSource
周围的某个内部对象应用JmsDestinationPollingSource
作为<int-jms:inbound-channel-adapter>
的合约 .相反,您可以使用
<transactional>
的<transactional>
配置 . 对:所有的下游流程都将由交易覆盖 .在这种情况下为该内部完成交易
在
receive()
和handleMessage()
周围,您应该像常规@Transactional
情况一样完成方法调用 . 为此,我们应该将下一个消息处理到另一个线程 . 只是因为默认情况下所有<channel>
都是DirectChannel
并且与当前调用堆栈绑定 .为此,您可以使用
ExecutorChannel
(<int:dispatcher task-executor="threadPoolExecutor"/>
)作为serviceActivatorChannel
.从那里你不需要其余的AOP配置 .
不确定您的其他网络应用程序的第二个问题 . 看起来它有一些逻辑可以挽回它的工作,以防你身边出现一些不一致的情况 . 但无论如何,这看起来像一个不同的问题 .