我正在使用Spring Boot 1.5.13.RELEASE和Spring Integration 4.3.16.RELEASE开发应用程序 .
我是Spring Integration的新手,我遇到了一个问题 .
所以基本的想法是,在一些外部触发器(可能是和HTTP调用)上,我需要创建一个IntegrationFlow,它将使用来自rabbitMQ队列的消息,与'em一起工作,然后(可能)生成另一个rabbitMQ endpoints .
现在这应该发生很多次,所以我将不得不创建 multiple IntegrationFlows .
我正在使用 IntegrationFlowContext 来注册每个IntegrationFlow,如下所示:
IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();
我必须澄清这可以同时发生,同时创建 multiple IntegrationFlows .
所以每次我尝试创建IntegrationFlow时,我的“源”都是一个看起来像这样的网关:
MessagingGatewaySupport sourceGateway = Amqp
.inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
.concurrentConsumers(1)
.adviceChain(retryInterceptor)
.autoStartup(false)
.id("sgX-" + uuid)
.get();
它不是@Bean(还),但我希望它在每个IntegrationFlow注册时都能注册 .
我的“目标”是AmqpOutBoundAdapter,如下所示:
@Bean
public AmqpOutboundEndpoint outboundAdapter(
RabbitTemplate rabbitTemplate,
ApplicationMessagingProperties applicationMessagingProperties
) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("someStandardExchange")
.routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
.get();
}
现在这个已经是 IS ,并且每次我尝试创建流时都会注入 .
我的流程看起来像这样:
public IntegrationFlow configure() {
return IntegrationFlows
.from(sourceGateway)
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
.filter(injectedGenericSelectorFilter)
.<HashMap<String, String>>handle((payload, headers) -> {
String uuid = payload.get("uuid");
boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
myInjectedApplicationService.handlePayload(payload);
return MessageBuilder
.withPayload(payload)
.setHeader("shouldForward", shouldForwardMessage)
.setHeader("rabbitmq.ROUTING_KEY", uuid)
.build();
})
.filter("headers.get('shouldForward').equals(true)")
.transform(Transformers.toJson(jsonObjectMapper))
.handle(outboundAdapter)
.get();
}
我的问题是,当应用程序启动正常并创建第一个IntegrationFlows等 . 后来,我遇到了这种例外:
java.lang.IllegalStateException:无法在bean名称'org.springframework.integration.transformer.MessageTransformingHandler#872'下注册对象[org.springframework.integration.transformer.MessageTransformingHandler#872]:已有对象[org.springframework . integration.transformer.MessageTransformingHandler#872]绑定
我甚至尝试为每个使用的组件设置一个id,它应该被用作beanName,如下所示:
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))
但是,即使解决了像.filter这样的组件的bean名称问题,我仍然会得到关于MessageTransformingHandler的相同异常 .
更新
我没有提到这样一个事实,即每个 IntegrationFlow
完成它的工作后,它会被使用 IntegrationFlowContext
删除,如下所示:
flowContext.remove(flowId);
所以似乎有(有点)工作是通过使用相同的对象作为锁来同步 flow registration 块和 flow removing 块 .
所以负责注册和删除流的我的类看起来像这样:
...
private final Object lockA = new Object();
...
public void appendNewFlow(String callUUID){
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);
synchronized (lockA) {
flowContext.registration(integrationFlow).id(callUUID).register();
}
}
public void removeFlow(String flowId){
synchronized (lockA) {
flowContext.remove(flowId);
}
}
...
我现在的问题是这种锁对于应用来说有点沉重,因为我得到了很多:
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.
这不会像我想的那样快 .
但我想这是预期的,因为每次线程获得锁定时,流程及其所有组件或流程及其所有组件都需要一些时间 .
1 回答
你也有这个:
如果你在那里添加一个
.id()
它是如何工作的?另一方面,既然你说这是同时发生的,我想知道你是否可以制作一些代码
synchonized
,例如包裹flowContext.registration(integrationFlow).id(callUUID).register();
.bean定义和注册过程实际上不是线程安全的,并且只能在应用程序生命周期开始时从一个初始化线程中使用 .
我们可能真的需要在
register(IntegrationFlowRegistrationBuilder builder)
函数中创建IntegrationFlowContext
作为线程安全,或者至少它的registerBean(Object bean, String beanName, String parentName)
,因为这正是我们生成bean名称并注册它的地方 .随意就此事提出JIRA .
遗憾的是,Spring Integration Java DSL扩展项目已经不再受支持,我们只能为当前的
5.x
一代添加修复程序 . 不过我认为synchonized
解决方法应该在这里工作,因此无需将其移植到Spring Integration Java DSL扩展中 .