首页 文章

Spring Integration:从Kafka Queue读取

提问于
浏览
1

在Spring Boot应用程序中,我想使用Spring Integration从Kafka队列中读取 . 配置如下:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = ... // set properties
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public DirectChannel receiver() {
    return new DirectChannel();
}

@Autowired
private Resolver resolver;

@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
    return new EventDrivenConsumer(receiver(), resolver);
}

Resolver bean实现 MessageHandler .

消息在队列中接收,但解析器bean不处理 .

Spring Boot应用程序注释如下:

@SpringBootApplication(exclude = KafkaAutoConfiguration.class)

所以不应该有Kafka bean的自动配置 .

以下是错误:

java.lang.NullPointerException: null
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

调试,显然在 RecordMessagingMessageListenerAdapter (堆栈跟踪的顶部), this.methodHandler 为空 .

Spring Integration将通道连接到应该处理消息的bean的正确方法是什么?

1 回答

  • 0

    这是决议:

    该项目的父声明如下:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.5.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    

    对于以下依赖项:

    <dependency>
        <groupId>org.springframework.integration</groupId>
        <artifactId>spring-integration-kafka</artifactId>
        <version>${spring-integration-kafka.version}</version>
    </dependency>
    

    之前曾使用 spring-integration-kafka.version

    3.0.1.RELEASE
    

    将此更改为:

    2.1.0.RELEASE
    

    一切正常 .

    但是,如果没有 spring-integration-kafka 的显式版本,则由于缺少类而无法构建项目 .

    Boot的一个优点是它处理依赖版本 . 也许应该有一个 spring-boot-integration-kafka 依赖,这可以防止这个问题 .

相关问题