首页 文章

Spring Cloud Stream Dispatcher没有订阅者

提问于
浏览
1

Spring Cloud Stream Dispatcher没有订阅者错误 .

在成功启动Spring引导容器之后,我们需要在Kafka主题上放置一些通知消息,并且我们的几个微服务具有相同的功能,因此我们编写了一个包含输出通道定义和分派工具的公共jar . 只要我们在 SpringApplication.run 调用之后立即调用util,功能就会按预期工作 .

Following is one of our microservices Application class sample.

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        ConfigurableApplicationContext context =SpringApplication.run(Application.class, args);
        context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
    }
}

上面的设置按预期工作,但这给开发人员在每个微服务上编写锅炉模板代码带来了不必要的负担 . 因此,为了避免这种情况,我们编写了一个Aspect实现来执行相同的功能,但是使用我们的方面方法,我们遇到了以下错误 .

org.springframework.context.ApplicationContextException:无法启动bean'outputBindingLifecycle';嵌套异常是org.springframework.messaging.MessageDeliveryException:Dispatcher没有通道'schedulertestsvcs的订阅者:dev:1180.scheduledJobExecutionResponseOutput' . ;嵌套异常是org.springframework.integration.MessageDispatchingException:Dispatcher没有订阅者

我们尝试了几种方法,比如Spring SmartLifeCycle来处理所有Kafka输出/输入通道启动完成,但所有这些方法都遇到了同样的错误 .

以下是我们在org.springframework.boot.SpringApplication.run(..)上的Aspect实现

@Aspect
@Component
public class SchedulerConsumerAspect {

    @Autowired
    protected ApplicationContext applicationContext;
    @AfterReturning(value = "execution(* org.springframework.boot.SpringApplication.run(..))",returning = "result")
    public void afterConsumerApplicationStartup(JoinPoint pjp, Object result) throws Throwable {
        if(result!=null){
            ConfigurableApplicationContext context=(ConfigurableApplicationContext) result;
            if(context.containsBean("schedulerConsumerUtils")){
                //For what ever reason the following call resulting in Dispatcher has no subscribers for channel error.
                //TODO fix the above issue and enable the following call.
                context.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
            }
        }
    }

}

在我们的调试会话期间,我们发现org.springframework.boot.SpringApplication.run(..)在引导过程中多次调用Aspect . 首先调用方面时,我们得到结果值为null,经过一段时间后,spring引导调用相同的方面,此时结果不为null . 即使在获得结果不为null之后,也没有被授予者组件完全初始化,这就是为什么你看到 context.containsBean("schedulerConsumerUtils") 的检查 . 但是在bean初始化之后,我们看到输出通道没有完全绑定 .

处理Spring Cloud Stream Kafka输出/输入通道绑定完成的最佳方法是什么?

为什么组件调用在SpringBoot应用程序中工作正常,而不是通过Aspect?我在这几天挣扎找不到合适的解决方案 . 任何帮助非常感谢 .

1 回答

  • 0

    我按照这篇文章的建议Spring cloud stream - send message after application initalization并使用了第三个选项ApplicationRunner . 前两个选项对我不起作用 .

    @Component
    public class AppStartup implements ApplicationRunner {
        @Autowired
        protected ApplicationContext applicationContext;
    
        @Override
        public void run(ApplicationArguments args) throws Exception {
            if(applicationContext!=null){
                applicationContext.getBean(SchedulerConsumerUtils.class).registerOrRestartConsumerJobs();
            }
        }
    }
    

相关问题