首页 文章

Spring Boot Kafka消息消费者和丢失的消息

提问于
浏览
0

我使用Spring Boot 2.0.2和Spring Kafka . 我还使用以下存储库中的Kafka Docker image 1.1.0:https://hub.docker.com/r/wurstmeister/kafka/tags/

这是我的 Kafka 配置:

@Configuration
@EnableKafka
public class KafkaConfig {
}

@Configuration
public class KafkaConsumerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;

    @Bean
    public Map<String, Object> consumerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));

        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory());

        return factory;
    }

}

@Configuration
public class KafkaProducerConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Bean
    public Map<String, Object> producerConfigs() {

        Map<String, Object> props = new HashMap<>();

        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
        props.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, 15000000);

        return props;
    }

    @Bean
    public ProducerFactory<String, Post> postProducerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, Post> postKafkaTemplate() {
        return new KafkaTemplate<>(postProducerFactory());
    }

}

这是Kafka application.properties

#Kafka
spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=postfenix
kafka.topic.posts.create=posts.create

这是我的消息监听器:

@Component
public class PostConsumer {

    static final Logger logger = LoggerFactory.getLogger(PostConsumer.class);

    @KafkaListener(topics = "${kafka.topic.posts.create}", containerFactory = "postKafkaListenerContainerFactory")
    public void createPost(ConsumerRecord<String, Post> consumerRecord) {

        Post post = consumerRecord.value();

        logger.info("Received message for post creation: {}", post);
    }

}

我还实现了 PostService ,它应该将 Post 发送到Kafka主题:

@Service
public class PostServiceImpl implements PostService {

    static final Logger logger = LoggerFactory.getLogger(PostServiceImpl.class);

    @Value("${kafka.topic.posts.create}")
    private String kafkaTopicPostsCreate;

    @Autowired
    private KafkaTemplate<String, Post> postKafkaTemplate;

    @Override
    public void sendPost(Post post) {

        postKafkaTemplate.send(kafkaTopicPostsCreate, post);

        logger.info("Message sent to the post creation queue: {}", post);
    }

}

我还实现了SpringBoot测试:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = { TestApplication.class })
public class PostServiceIT {

    @Autowired
    private PostService postService;

    @Autowired
    private MessageRepository messageRepository;

    @Before
    public void setUp() {
        messageRepository.deleteAll();
    }

    @Test
    public void testCreatePost() throws InterruptedException {

        assertEquals(0, messageRepository.findAll().size());

        Post post = new Post();

        ...

        postService.sendPost(post);

        await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);
    }

}

这是日志:

2018-06-09 16:12:37.547  INFO 17824 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler 'schedulerFactoryBean' initialized from an externally provided properties instance.
2018-06-09 16:12:37.547  INFO 17824 --- [           main] org.quartz.impl.StdSchedulerFactory      : Quartz scheduler version: 2.3.0
2018-06-09 16:12:37.548  INFO 17824 --- [           main] org.quartz.core.QuartzScheduler          : JobFactory set to: org.springframework.scheduling.quartz.AdaptableJobFactory@7a3e5cd3
2018-06-09 16:12:38.967  INFO 17824 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483547
2018-06-09 16:12:38.997  INFO 17824 --- [           main] o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [127.0.0.1:9093]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = postfenix
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 600000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 305000
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.springframework.kafka.support.serializer.JsonDeserializer

2018-06-09 16:12:39.095  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.1.0
2018-06-09 16:12:39.095  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.100  INFO 17824 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService 
2018-06-09 16:12:39.104  INFO 17824 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2018-06-09 16:12:39.104  INFO 17824 --- [           main] o.s.s.quartz.SchedulerFactoryBean        : Starting Quartz Scheduler now
2018-06-09 16:12:39.104  INFO 17824 --- [           main] org.quartz.core.QuartzScheduler          : Scheduler schedulerFactoryBean_$_NON_CLUSTERED started.
2018-06-09 16:12:39.111  INFO 17824 --- [SchedulerThread] c.n.quartz.mongodb.dao.TriggerDao        : Found 0 triggers which are eligible to be run.
2018-06-09 16:12:39.119  INFO 17824 --- [           main] com.postfenix.domain.post.PostServiceIT  : Started PostServiceIT in 5.094 seconds (JVM running for 5.74)
2018-06-09 16:12:39.121  INFO 17824 --- [           main] c.p.d.configuration.TestApplication      : Initializing application...
2018-06-09 16:12:39.258  INFO 17824 --- [           main] org.mongodb.driver.connection            : Opened connection [connectionId{localValue:4, serverValue:4}] to localhost:27018
2018-06-09 16:12:39.338  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 2 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.339  INFO 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.Metadata        : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.392  INFO 17824 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    batch.size = 16384
    bootstrap.servers = [127.0.0.1:9093]
    buffer.memory = 33554432
    client.id = 
    compression.type = none
    connections.max.idle.ms = 540000
    enable.idempotence = false
    interceptor.classes = []
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    linger.ms = 0
    max.block.ms = 60000
    max.in.flight.requests.per.connection = 5
    max.request.size = 15000000
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    receive.buffer.bytes = 32768
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retries = 0
    retry.backoff.ms = 100
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = null
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    transaction.timeout.ms = 60000
    transactional.id = null
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

2018-06-09 16:12:39.419  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.1.0
2018-06-09 16:12:39.419  INFO 17824 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : fdcf75ea326b8e07
2018-06-09 16:12:39.437  WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 1 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.437  INFO 17824 --- [ad | producer-1] org.apache.kafka.clients.Metadata        : Cluster ID: BYqDmOq_SDCll0ILZI_KoA
2018-06-09 16:12:39.454  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 4 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.565  WARN 17824 --- [ad | producer-1] org.apache.kafka.clients.NetworkClient   : [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.590  WARN 17824 --- [ntainer#0-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=postfenix] Error while fetching metadata with correlation id 6 : {posts.create=LEADER_NOT_AVAILABLE}
2018-06-09 16:12:39.704  INFO 17824 --- [           main] c.p.domain.service.post.PostServiceImpl  : Message sent to the post creation queue: Post [chatId=@name, parseMode=HTML]
2018-06-09 16:12:40.229  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Discovered group coordinator 10.0.75.1:9093 (id: 2147482646 rack: null)
2018-06-09 16:12:40.232  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Revoking previously assigned partitions []
2018-06-09 16:12:40.233  INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked: []
2018-06-09 16:12:40.233  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] (Re-)joining group
2018-06-09 16:12:40.295  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Successfully joined group with generation 1
2018-06-09 16:12:40.297  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-1, groupId=postfenix] Setting newly assigned partitions [posts.create-0]
2018-06-09 16:12:40.313  INFO 17824 --- [ntainer#0-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-1, groupId=postfenix] Resetting offset for partition posts.create-0 to offset 1.
2018-06-09 16:12:40.315  INFO 17824 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [posts.create-0]

现在,我的测试在以下行中失败:

await().atMost(60, SECONDS).pollDelay(1000, MILLISECONDS).until(() -> messageRepository.findAll().size() == 1);

因为在第一次测试运行后,由于某种原因,消息未传递给 PostConsumer.createPost 方法 . 但是如果我在同一个Kafka docker实例上第二次运行相同的测试,则上一次测试运行的消息将成功传递到 PostConsumer.createPost . 我做错了什么以及为什么在第一次测试运行后没有传递消息以及如何修复它?

UPDATED

这是我更新的 KafkaConsumerConfig

@Configuration
public class KafkaConsumerConfig {

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());

        return factory;
    }

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory() {

        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory());

        return factory;
    }

}

现在我在 kafkaListenerContainerFactorypostConsumerFactory 方法中有2个编译错误,因为 consumerConfigs() 方法不存在, kafkaListenerContainerFactory 中的 consumerFactory 方法需要 KafkaProperties .

1 回答

  • 2

    spring.kafka.consumer.auto-offset-reset =最早的spring.kafka.consumer.group-id = postfenix

    您没有使用这些引导属性,因为您正在创建自己的使用者配置 .

    你应该替换它

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    
    @Value("${spring.kafka.consumer.group-id}")
    private String consumerGroupId;
    
    @Bean
    public Map<String, Object> consumerConfigs() {
    
        Map<String, Object> props = new HashMap<>();
    
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, (int) TimeUnit.MINUTES.toMillis(10));
    
        return props;
    }
    
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }
    

    @Bean
    public ConsumerFactory<String, String> consumerFactory(
             KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(),
             new StringDeserializer(), new JsonDeserializer<>(String.class));
    }
    

    EDIT

    @Bean
    public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory(kafkaProperties));
    
        return factory;
    }
    
    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    
        return factory;
    }
    

相关问题