首页 文章

spring-integration amqp出站适配器竞争条件?

提问于
浏览
2

我们的一个 生产环境 应用程序中有一个相当复杂的spring-integration-amqp用例,我们在启动时看到了一些“org.springframework.integration.MessageDispatchingException:Dispatcher没有订阅者”异常 . 在启动时出现初始错误后,我们不再从相同的组件中看到这些异常 . 这看起来像依赖于AMQP出站适配器的组件上的某种启动竞争条件,并最终在生命周期的早期使用它们 .

我可以通过调用发送到PostConstruct方法中连接到出站适配器的通道的网关来重现这一点 .

配置:

package gadams;

import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.amqp.Amqp;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.messaging.MessageChannel;

@SpringBootApplication
@IntegrationComponentScan
public class RabbitRace {

    public static void main(String[] args) {
        SpringApplication.run(RabbitRace.class, args);
    }

    @Bean(name = "HelloOut")
    public MessageChannel channelHelloOut() {
        return MessageChannels.direct().get();
    }

    @Bean
    public Queue queueHello() {
        return new Queue("hello.q");
    }

    @Bean(name = "helloOutFlow")
    public IntegrationFlow flowHelloOutToRabbit(RabbitTemplate rabbitTemplate) {
        return IntegrationFlows.from("HelloOut").handle(Amqp.outboundAdapter(rabbitTemplate).routingKey("hello.q"))
                .get();
    }

}

网关:

package gadams;

import org.springframework.integration.annotation.Gateway;
import org.springframework.integration.annotation.MessagingGateway;

@MessagingGateway
public interface HelloGateway {

    @Gateway(requestChannel = "HelloOut")
    void sendMessage(String message);
}

零件:

package gadams;

import javax.annotation.PostConstruct;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.DependsOn;
import org.springframework.stereotype.Component;

@Component
@DependsOn("helloOutFlow")
public class HelloPublisher {

    @Autowired
    private HelloGateway helloGateway;

    @PostConstruct
    public void postConstruct() {
        helloGateway.sendMessage("hello");
    }
}

在我的 生产环境 用例中,我们有一个带有PostConstruct方法的组件,我们使用TaskScheduler来调度一些组件,其中一些组件依赖于AMQP出站适配器,其中一些组件最终会立即执行 . 我已经尝试在涉及出站适配器的IntegrationFlows上放置bean名称,并在使用网关和/或网关本身的bean上使用@DependsOn,但这并没有消除启动时的错误 .

1 回答

  • 1

    那一切都叫 Lifecycle . 任何Spring Integration endpoints 仅在执行 start() 时才开始侦听或生成消息 .

    通常对于标准默认 autoStartup = true ,它在 ApplicationContext.finishRefresh(); 中作为a完成

    // Propagate refresh to lifecycle processor first.
    getLifecycleProcessor().onRefresh();
    

    @PostConstructafterPropertiesSet() )开始向 Channels 发送消息真的很早,因为它远离 finishRefresh() .

    你真的应该重新考虑你的 生产环境 逻辑和实施到 SmartLifecycle.start() 阶段 .

    请参阅Reference Manual中的更多信息 .

相关问题