首页 文章

spring-cloud-stream kafka消费者并发

提问于
浏览
4

使用spring-cloud-stream的kafka Binders ,如何配置并发消息使用者(在单个消费者jvm中)?如果我理解正确,使用kafka时并发消息消耗需要分区,但s-c-s docs表示要使用分区,您需要通过partitionKeyExpression或partitionKeyExtractorClass在生成器中指定分区选择 . Kafka docs提到了循环分区 .

s-c-s文档根本没有提到spring.cloud.stream.bindings . * . 并发性,尽管在我上面描述的用例中这似乎很重要 . 使用 生产环境 者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save:
          destination: customer-save
          group: customer-save
          content-type: application/json
          partitionCount: 3

和消费者配置

spring:
  cloud:
    stream:
      bindings:
        customer-save: 
          destination: customer-save
          group: customer-save
          content-type: application/x-java-object;type=foo.Customer
          partitioned: true
          concurrency: 3

我似乎得到了我想要的行为(至少在某种程度上) . 我可以看到有时有3个消费者线程处于活动状态,虽然在播放时似乎除了循环之外似乎有一些分区,因为一些消息似乎等待繁忙的消费者线程并在该线程完成后消耗掉 . 我假设这是因为消息被发送到同一个分区 .

当我没有指定partitionKeyExpression或partitionKeyExtractorClass时,是否有一些默认的密钥提取和分区策略在 生产环境 者上使用?这是使用kafka设置s-c-s使用者的合适方法吗?您希望多个线程消费消息以提高消费者吞吐量?

1 回答

  • 2

    由于您的 生产环境 者未被分区(没有设置 partitionKeyExpression ), 生产环境 者方将在3个分区上循环(如果这不是观察到的行为,请在Git Hub中打开一张票) . 如果您配置了 partitionKeyExpression ,则 生产环境 者将根据配置的逻辑有效地对数据进行分区 .

    在消费者方面,我们确保线程/分区关联,因为这是一个受到广泛尊重的Kafka约定 - 我们确保按顺序处理给定分区上的消息 - 这可能会考虑您正在观察的行为 . 如果将消息A,B,C,D发送到分区0,1,2,0-D将必须等待直到处理A,即使有两个其他线程可用 .

    提高吞吐量的一个选择是过度分配(这是Kafka中相当典型的策略) . 这会进一步扩散消息,并增加消息发送到不同线程的机会 .

    如果您不关心订购,那么增加吞吐量的另一个选择是在下游异步处理消息:例如通过将输入通道桥接到ExecutorChannel .

    一般来说, partitioned 指的是客户端接收分区数据的能力(Kafka客户端总是被分区,但此设置也适用于Rabbit和/或Redis) . 它与属性 instanceIndexinstanceCount 结合使用,以确保在多个应用程序实例之间正确划分主题的分区(另请参阅http://docs.spring.io/spring-cloud-stream/docs/1.0.0.M4/reference/htmlsingle/index.html#_instance_index_and_instance_count

相关问题