首页 文章

Spring Cloud 流产生并消耗相同的主题

提问于
浏览
0

我有一个使用Spring Boot和Spring Cloud Stream的服务 . 此服务会生成某个主题,并且也会使用此主题 . 当我第一次启动该服务并且Kafka中不存在该主题时,抛出以下异常:

java.lang.IllegalStateException:预期分区的数量为:100,但在org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner $ 2.doWithRetry(KafkaTopicProvisioner.java:260)〜[ spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar!/:1.2.1.RELEASE] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner $ 2.doWithRetry(KafkaTopicProvisioner . java:246)〜[spring-cloud-stream-binder-kafka-core-1.2.1.RELEASE.jar!/:1.2.1.RELEASE]在org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java) :286)〜[spring-retry-1.2.0.RELEASE.jar!/:na] org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:163)〜[spring-retry-1.2.0 . RELEASE.jar!/:na] at org.springframework.cloud.stream.binder.kafka.provisioning.KafkaTopicProvisioner.getPartitionsForTopic(KafkaTopicProvisioner.java:246)〜[spring-cloud-stream-binder-kafka-core-1.2.1 org.springframework上的.RELEASE.jar!/:1.2.1.RELEASE] org.springframework上的.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:149)[spring-cloud-stream-binder-kafka-1.2.1.RELEASE.jar!/:1.2.1.RELEASE] org.springframework上的.cloud.stream.binder.kafka.KafkaMessageChannelBinder.createProducerMessageHandler(KafkaMessageChannelBinder.java:88)[spring-cloud-stream-binder-kafka-1.2.1.RELEASE.jar!/:1.2.1.RELEASE] org.springframework.cloud.stream.binder中的.cloud.stream.binder.AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:112)[spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE] .AbstractMessageChannelBinder.doBindProducer(AbstractMessageChannelBinder.java:57)[spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE] org.springframework.cloud.stream.binder.AbstractBinder.bindProducer(AbstractBinder) .java:152)[org.springframework.cloud.stream.binding.BindingService.bindProducer(BindingService.jav)中的[spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE] a:124)[spring-cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE] org.springframework.cloud.stream.binding.BindableProxyFactory.bindOutputs(BindableProxyFactory.java:238)[chring -cloud-stream-1.2.2.RELEASE.jar!/:1.2.2.RELEASE]在org.springframework.cloud.stream.binding.OutputBindingLifecycle.start(OutputBindingLifecycle.java:57)[spring-cloud-stream-1.2 or2.springframework.context.support.DefaultLifecycleProcessor.doStart . (springL context-4.3.7.RELEASE.jar!/:4.3) .7.RELEASE] org.springframework上的org.springframework.context.support.DefaultLifecycleProcessor.access $ 200(DefaultLifecycleProcessor.java:50)[spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE] .context.support.DefaultLifecycleProcessor $ LifecycleGroup.start(DefaultLifecycleProcessor.java:348)[spring-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE] org.springframework.context.support.DefaultLifecycleProcessor.startBeans (DefaultLifecycleProcessor.java:151)[spri ng-context-4.3.7.RELEASE.jar!/:4.3.7.RELEASE]在org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:114)[spring-context-4.3.7.RELEASE . JAR /:4.3.7.RELEASE]

application.yml:
spring: cloud: stream: kafka: binder: brokers: kafka defaultBrokerPort: 9092 zkNodes: zookeeper defaultZkPort: 2181 minPartitionCount: 2 replicationFactor: 1 autoCreateTopics: true autoAddPartitions: true headers: type,message_id requiredAcks: 1 configuration: "[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol bindings: test-updater-input: consumer: autoRebalanceEnabled: true autoCommitOnError: true enableDlq: true test-updater-output: producer: sync: true configuration: retries: 0 tenant-updater-output: producer: sync: true configuration: retries: 100 default: binder: kafka contentType: application/json group: test-adapter consumer: maxAttempts: 1 bindings: test-updater-input: destination: test-tenant-update consumer: concurrency: 3 partitioned: true test-updater-output: destination: test-tenant-update producer: partitionCount: 100 tenant-updater-output: destination: tenant-entity-update producer: partitionCount: 100 我试图改变 生产环境 者和消费者的配置顺序,但它没有帮助 .

EDIT: 我添加了完整的application.yml . 第一次启动服务时,Kafka中不存在此主题 .
感觉 生产环境 者和消费者配置之间存在冲突,我认为它说有3个分区的原因是消费者中的并发性是3,所以它首先创建具有3个分区的主题,然后当它移动到 生产环境 者配置它没有调整分区计数 .

1 回答

  • 0

    预期分区数为:100,但已找到3个

    该主题没有足够的分区用于您的配置 .

    partitionCount:100

    将配置修复为3,或将主题上的分区数更改为100 .

    或者将 spring.cloud.stream.kafka.binder.autoAddPartitions 设置为 true .

相关问题