我正在使用Scala 2.11和Akka Streams Kafka 0.17 .
我有 stream 其中:
-
A
Source
使用Source.actorRef创建 . 这里,actor被安排以一定的间隔运行并连续生成消息,这些消息被发送到流 . -
我已将
Producer
附加为Flow
. 制作人将ProducerMessage.Message
推送到Kafka主题 . -
一些数据库操作 .
构建 ProducerMessage.Message
时出现问题,如下所示:
final case class Message[K, V, +PassThrough](
record: ProducerRecord[K, V],
passThrough: PassThrough
)
我可以轻松传递包含实际消息的 record
参数 . 但我不知道在 passThrough
参数中要传递什么 . 根据docs:
passThrough字段可以包含通过Consumer#flow传递并包含在Result中的任何元素 . 当需要在下游操作上传递某些上下文时,这很有用 . 这可以通过解压缩/ zip来完成,但这更方便 . 例如,它可以是ConsumerMessage.CommittableOffset或ConsumerMessage.CommittableOffsetBatch,可以在流程中稍后提交 .
在我的情况下,没有任何Kafka消费者订阅Kafka主题并为我的流生成 Source
( comittableSource
或 plainSource
) . 在这种情况下,我会按照文档中的描述传递消费者偏移量 . 但就我而言,演员正在模拟这样的消费者 . 这意味着我无法访问 ConsumerMessage.CommittableOffset
. 那么我在这里传递 passThrough
参数是什么?在这种情况下,最佳做法是什么?
1 回答
在向
reactive-kafka
小组转发我的问题后,我得到了答案 . 基本上,他们所说的是,如果你没有pass through
的任何用例,你可以尝试将其设置为 None 或 NotUsed ,或者可能只是空字符串 "" .另请注意,如果您使用
Producer.plainSink
,则无需构建ProducerMessage.Message
. 然后,您可以直接构造一个KafkaProducerRecord
. 那个ProducerMessage.Message
案例类只是需要或需要pass through
的案例的容器 . 除了要传递的元素之外,它只包含一个KafkaProducerRecord
.