首页 文章

如何在Akka Streams Kafka中构建ProducerMessage时配置偏移参数?

提问于
浏览
1

我正在使用Scala 2.11和Akka Streams Kafka 0.17 .

我有 stream 其中:

  • A Source 使用Source.actorRef创建 . 这里,actor被安排以一定的间隔运行并连续生成消息,这些消息被发送到流 .

  • 我已将 Producer 附加为 Flow . 制作人将 ProducerMessage.Message 推送到Kafka主题 .

  • 一些数据库操作 .

构建 ProducerMessage.Message 时出现问题,如下所示:

final case class Message[K, V, +PassThrough](
    record: ProducerRecord[K, V],
    passThrough: PassThrough
  )

我可以轻松传递包含实际消息的 record 参数 . 但我不知道在 passThrough 参数中要传递什么 . 根据docs

passThrough字段可以包含通过Consumer#flow传递并包含在Result中的任何元素 . 当需要在下游操作上传递某些上下文时,这很有用 . 这可以通过解压缩/ zip来完成,但这更方便 . 例如,它可以是ConsumerMessage.CommittableOffset或ConsumerMessage.CommittableOffsetBatch,可以在流程中稍后提交 .

在我的情况下,没有任何Kafka消费者订阅Kafka主题并为我的流生成 SourcecomittableSourceplainSource ) . 在这种情况下,我会按照文档中的描述传递消费者偏移量 . 但就我而言,演员正在模拟这样的消费者 . 这意味着我无法访问 ConsumerMessage.CommittableOffset . 那么我在这里传递 passThrough 参数是什么?在这种情况下,最佳做法是什么?

1 回答

  • 0

    在向 reactive-kafka 小组转发我的问题后,我得到了答案 . 基本上,他们所说的是,如果你没有 pass through 的任何用例,你可以尝试将其设置为 NoneNotUsed ,或者可能只是空字符串 "" .

    另请注意,如果您使用 Producer.plainSink ,则无需构建 ProducerMessage.Message . 然后,您可以直接构造一个Kafka ProducerRecord . 那个 ProducerMessage.Message 案例类只是需要或需要 pass through 的案例的容器 . 除了要传递的元素之外,它只包含一个Kafka ProducerRecord .

相关问题