我有一个自我执行的jar程序,它很大程度上依赖于Spring Integration . 我遇到的问题是 program is terminating before the other Spring beans have completely finished .
下面是我正在使用的代码的简化版本,如果需要,我可以提供更多代码/配置 . 入口点是一个main()方法,它引导Spring并启动导入过程:
public static void main(String[] args) {
ctx = new ClassPathXmlApplicationContext("flow.xml");
DataImporter importer = (DataImporter)ctx.getBean("MyImporterBean");
try {
importer.startImport();
} catch (Exception e) {
e.printStackTrace();
} finally {
ctx.close();
}
}
DataImporter包含一个简单的循环,用于将消息发送到Spring Integration网关 . 这为流程提供了一种主动的“推送”方法,而不是轮询数据的常用方法 . 这是我的问题所在:
public void startImport() throws Exception {
for (Item item : items) {
gatewayBean.publish(item);
Thread.sleep(200); // Yield period
}
}
为了完整起见,流XML看起来像这样:
<gateway default-request-channel="inChannel" service-interface="GatewayBean" />
<splitter input-channel="inChannel" output-channel="splitChannel" />
<payload-type-router input-channel="splitChannel">
<mapping type="Item" channel="itemChannel" />
<mapping type="SomeOtherItem" channel="anotherChannel" />
</payload-type-router>
<outbound-channel-adapter channel="itemChannel" ref="DAOBean" method="persist" />
流程有效地启动和处理项目,但是一旦startImport()循环结束,主线程终止并立即拆除所有Spring Integration线程 . 这导致竞争条件,当程序终止时,最后(n)项未被完全处理 .
我有一个想法是保持我正在处理的项目的引用计数,但事实证明这非常复杂,因为流程经常将消息拆分/路由到多个服务激活器 - 这意味着很难确定每个项目是否具有“完成” .
我认为我需要的是检查没有Spring bean仍在执行的方法,或者标记发送到网关的所有项目在终止之前已经完全处理 .
My question is, how might I go about doing either of these, or is there a better approach to my problem I haven't thought of?
2 回答
您没有在此处使用请求 - 响应模式 .
outbound-channel-adapter是一个fire and forget操作,如果你想等待响应,你应该使用一个等待响应的出站网关,并将响应连接到原来的网关,然后在java sendAndReceive中不仅仅是发布 .
如果你可以得到一个
Item
来确定它是否仍然需要(processingFinished()或在后端阶段执行的类似事件),你可以在中央机关注册所有Item
,它可以跟踪数字未完成的Item
并且有效地确定了终止条件 .如果这种方法可行,您甚至可以考虑将项目打包成
FutureTask
-objects或使用java.util.concurrent
中的类似概念 .编辑:第二个想法:
你有没有想过让 Channels 变得更聪明?一旦发送者不再发送任何数据,发送者就会关闭该 Channels . 在这种情况下,worker-bean不一定是deamon线程,但可以根据封闭和空的输入通道确定它们的终止条件 .