首页 文章

Lagom中Message Broker的完整示例

提问于
浏览
0

我正在尝试使用Lagom 1.2.2实现一个Message Broker,并且遇到了障碍 . 该文档的服务描述符有以下示例:

default Descriptor descriptor() {
return named("helloservice").withCalls(...)
  // here we declare the topic(s) this service will publish to
  .publishing(
    topic("greetings", this::greetingsTopic)
  )
  ....;
}

这个例子用于实现:

public Topic<GreetingMessage> greetingsTopic() {
return TopicProducer.singleStreamWithOffset(offset -> {
    return persistentEntityRegistry
        .eventStream(HelloEventTag.INSTANCE, offset)
        .map(this::convertEvent);
  });
}

但是,没有 convertEvent() 函数的参数类型或返回类型的示例,这是我'm drawing a blank. On the other end, the subscriber to the MessageBroker, it seems that it'消耗 GreetingMessage 对象的地方,但是当我创建函数 convertEvent 以返回 GreetingMessage 对象时,我收到编译错误:

Error:(61, 21) java: method map in class akka.stream.javadsl.Source<Out,Mat> cannot be applied to given types;
  required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>
  found: this::convertEvent
  reason: cannot infer type-variable(s) T
    (argument mismatch; invalid method reference
  incompatible types: akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset> cannot be converted to com.example.GreetingMessage)

有没有更详尽的如何使用它的例子?我已经检查了Chirper示例应用程序,它似乎没有这个例子 .

谢谢!

1 回答

  • 3

    您粘贴的错误消息告诉您 map 期望的确切内容:

    required: akka.japi.function.Function<akka.japi.Pair<com.example.GreetingEvent,com.lightbend.lagom.javadsl.persistence.Offset>,T>
    

    所以,你需要传递一个需要 Pair<GreetingEvent, Offset> 的函数 . 该函数应该返回什么?好吧,更新它以获取它,然后你'll get the next error, which once again will tell you what it was expecting you to return, and in this instance you' ll会发现它是 Pair<GreetingMessage, Offset> .

    要解释这些类型是什么 - Lagom需要跟踪哪些事件已发布到Kafka,以便在重新启动服务时,它不会从事件日志的开头开始,并从头开始重新发布所有事件 . 它通过使用偏移来实现 . 因此,事件日志会生成事件和偏移对,然后您需要将这些事件转换为将发布到Kafka的消息,并且当您将转换后的消息返回到Lagom时,它需要与偏移量成对你从事件日志中得到的,因此在发布到Kafka之后,Lagom可以保留偏移量,并在下次重新启动服务时将其用作起点 .

    这里可以看到一个完整的例子:https://github.com/lagom/online-auction-java/blob/a32e696/bidding-impl/src/main/java/com/example/auction/bidding/impl/BiddingServiceImpl.java#L91

相关问题