首页 文章

Lagom PubSubRef订阅者丢弃消息

提问于
浏览
0

[Attention] The question is Lagom framework specific!

在我目前的项目中,当上游速度很快并且看起来像下游无法及时处理所有消息时,已经观察到从Source到Kafka主题发布者切断消息列表的问题 . 实现,切割与PubSubRef.subscribe()方法的行为有关https://github.com/lagom/lagom/blob/master/pubsub/javadsl/src/main/scala/com/lightbend/lagom/javadsl/pubsub/PubSubRef.scala#L85

完整的方法定义:

def subscriber(): Source[T, NotUsed] = {
scaladsl.Source.actorRef[T](bufferSize, OverflowStrategy.dropHead)
  .mapMaterializedValue { ref =>
    mediator ! Subscribe(topic.name, ref)
    NotUsed
  }.asJava
}

有使用OverflowStrategy.dropHead . 可以改为使用back-pressure strategy吗?

UPD#1: 用例非常简单,当查询请求发布到命令主题中,从DB表中获取它并查询对象时,生成的列表被推送到结果Kafka主题中 . 代码段:

objectsResultTopic = pubSub.refFor(TopicId.of(CustomObject.class, OBJECTS_RESULT_TOPIC));
objectQueryTopic().subscribe().atLeastOnce(
Flow.fromSinkAndSource(
    Flow.fromFunction(this::deserializeCommandAndQueryObjects)
        .mapAsync(concurrency, objects -> objects)
        .flatMapMerge(concurrency, objects -> objects)
        .alsoTo(Sink.foreach(event -> LOG.trace("Sending object {}", object)))
        .to(objectsResultTopic.publisher()),
    Source.repeat(Done.getInstance())
    )
)

如果 deserializeCommandAndQueryObjects 函数生成的对象流超过默认缓冲区大小= 1000,则会开始剪切元素(我们的情况是~2.5k对象) .

UPD#2: 对象数据源是:

// returns CompletionStage<Source<CustomObject, ?>>
jdbcSession.withConnection(
  connection -> Source.from(runQuery(connection, rowConverter))
)

并且订阅了Kafka objectsResultTopic

TopicProducer.singleStreamWithOffset(
offset -> objectsResultTopic.subscriber().map(gm -> {
    JsonNode node = mapper.convertValue(gm, JsonNode.class);
    return Pair.create(node, offset);
}));

2 回答

  • 4

    如果有人感兴趣,最后我们通过使用Akka Producer API解决了这个问题,例如:

    ProducerSettings<String, CustomObject> producerSettings = ProducerSettings.create(system, new StringSerializer(), new CustomObjectSerializer());
    objectQueryTopic().subscribe().atLeastOnce(
    Flow.fromSinkAndSource(
        Flow.fromFunction(this::deserializeCommandAndQueryObjects)
            .mapAsync(concurrency, objects -> objects)
            .flatMapMerge(concurrency, objects -> objects)
            .alsoTo(Sink.foreach(object -> LOG.trace("Sending event {}", object)))
            .map(object -> new ProducerRecord<String, CustomObject>(OBJECTS_RESULT_TOPIC, object))
            .to(Producer.plainSink(producerSettings)),
        Source.repeat(Done.getInstance())));
    

    它没有缓冲工作,只是推动 Kafka 主题 .

  • 1

    听起来像Lagom的distributed publish-subscribe功能可能不是你工作的最佳工具 .

    你的问题提到 Kafka ,但这个功能没有使用 Kafka . 相反,它通过直接向群集中的所有订户广播消息来工作 . 这是一种“最多一次”的消息传输,可能确实会丢失消息,并且适用于那些更关心跟上最近消息而不是处理每一条消息的消费者 . 溢出策略不可自定义,并且您不希望在这些用例中使用反压,因为这意味着一个缓慢的消费者可能会减慢向所有其他订阅者的传递速度 .

    您还有其他一些选择:

    • 如果你想使用Kafka,你应该使用Lagom的message broker API . 这支持"at least once"传递语义,可用于确保每个使用者处理每条消息(代价是可能增加延迟) .

    在这种情况下,Kafka充当了一个巨大的持久缓冲区,因此它甚至比背压更好: 生产环境 者和消费者可以以不同的速度前进,并且(当与partitioning一起使用时)您可以添加消费者以扩展和处理消息需要时更快 .

    当 生产环境 者和消费者都在同一服务中时,可以使用消息代理API,但它特别适合于服务之间的通信 .

    • 如果您发送的消息是持久性实体事件,并且消费者是同一服务的一部分,则persistent read-side processor可能是一个不错的选择 .

    这也提供"at least once"传递,如果处理消息的唯一影响是数据库更新,则对Cassandra read-side databasesrelational read-side databases的内置支持提供"effectively once"语义,其中数据库更新以事务方式运行,以确保在事件处理期间发生的故障不会导致部分更新 .

    • 如果您发送的消息是持久性实体事件,则消费者是同一服务的一部分,但您希望将事件作为流处理,您可以访问raw stream of events .

    • 如果您的用例不适合Lagom明确支持的用例之一,您可以使用较低级别的Akka API(包括distributed publish-subscribe)来实现更符合您需求的内容 .

    最佳选择取决于您的用例的具体情况:消息来源和您想要的消费者类型 . 如果您使用更多详细信息更新问题并为此答案添加评论,我可以使用更具体的建议编辑答案 .

相关问题