我正在尝试使用Spring Cloud Stream和Kafka集成创建Spring Boot应用程序 . 我在Kafka中创建了一个带有1个分区的示例主题,并根据此处给出的指示从Spring Boot应用程序发布了该主题
http://docs.spring.io/spring-cloud-stream/docs/1.0.2.RELEASE/reference/htmlsingle/index.html
和
https://blog.codecentric.de/en/2016/04/event-driven-microservices-spring-cloud-stream/
Spring Boot App -
@SpringBootApplication
public class MyApplication {
private static final Log logger = LogFactory.getLog(MyApplication.class);
public static void main(String[] args) {
SpringApplication.run(MyApplication.class, args);
}
}
Kafka 制片人班
@Service
@EnableBinding(Source.class)
public class MyProducer {
private static final Log logger = LogFactory.getLog(MyProducer.class);
@Bean
@InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10000", maxMessagesPerPoll = "1"))
public MessageSource<TimeInfo> timerMessageSource() {
TimeInfo t = new TimeInfo(new Timestamp(new Date().getTime())+"","Label");
MessageBuilder<TimeInfo> m = MessageBuilder.withPayload(t);
return () -> m.build();
}
public static class TimeInfo{
private String time;
private String label;
public TimeInfo(String time, String label) {
super();
this.time = time;
this.label = label;
}
public String getTime() {
return time;
}
public String getLabel() {
return label;
}
}
}
一切都运行良好,除非我想处理异常 .
如果Kafka主题发生故障,我可以看到在应用程序的日志文件中抛出了ConnectionRefused异常,但内置的重试逻辑似乎正在连续重试而不停止!
我没有异常处理和进行进一步的异常处理 . 我已经在上面的Spring Cloud Stream文档中阅读了Producer选项和Kafka的Binder选项,但是我看不到任何可能的自定义选项,以便让我捕获此异常 .
我是Spring Boot / Spring Cloud Stream / Spring Integration的新手(它似乎是 Cloud 流项目的底层实现) .
你还知道其他任何事情可以将这个异常级联到我的Spring Cloud Stream应用程序吗?