首页 文章

如何使用Spring Cloud Stream Kafka和每服务数据库实现微服务事件驱动架构

提问于
浏览
6

我正在尝试实现事件驱动的体系结构来处理分布式事务 . 每个服务都有自己的数据库,并使用Kafka发送消息以通知其他微服务有关操作 .

一个例子:

Order service -------> | Kafka |------->Payment Service
       |                                       |
Orders MariaDB DB                   Payment MariaDB Database

订单收到订单请求 . 它必须将新订单存储在其数据库中并发布消息,以便付款服务意识到它必须为该项目收费:

私人订单业务订单业务;

@PostMapping
public Order createOrder(@RequestBody Order order){
    logger.debug("createOrder()");
    //a.- Save the order in the DB
    orderBusiness.createOrder(order);
    //b. Publish in the topic so that Payment Service charges for the item.
    try{
        orderSource.output().send(MessageBuilder.withPayload(order).build());
    }catch(Exception e){
        logger.error("{}", e);
    }
    return order;
}

这些是我的疑惑:

  • 步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行 . 我怎样才能做到这一点?

  • 这与前一个相关:我发送消息:orderSource.output() . send(MessageBuilder.withPayload(order).build());无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true . 我如何知道该消息已经到达Kafka经纪人?

2 回答

  • 2

    步骤a.-(保存在订单DB中)和b.-(发布消息)应该在事务中以原子方式执行 . 我怎样才能做到这一点?

    Kafka目前不支持事务(因此也不支持回滚或提交),您可以按照自己的意愿执行操作 . 当KIP-98合并时,这将在近期未来发生变化,但这可能还需要一些时间 . 此外,即使在Kafka中进行事务处理,跨两个系统的原子事务也是一件非常困难的事情,随后的所有事情只会通过Kafka中的事务支持得到改善,它仍然无法完全解决您的问题 . 为此,您需要考虑在您的系统中实现某种形式的two phase commit .

    您可以通过配置 生产环境 者属性来稍微接近,但最终您必须为您的一个系统(MariaDB或Kafka)选择至少一次或最多一次 .

    让我们从您在Kafka可以做的事情开始,确保传递信息,然后我们将深入了解整个流程的选项以及结果 .

    Guaranteed delivery

    在使用参数acks向您返回请求之前,您可以配置有多少代理必须确认收到您的消息:通过将此设置为您告诉代理等待所有副本确认您的消息然后向您返回答案之前的所有经纪人 . 这仍然不能100%保证您的消息不会丢失,因为它只是已经写入页面缓存而且存在理论情况,其中代理在持久存储到光盘之前失败,其中消息可能仍然丢失 . 但这是一个很好的保证,你会得到 . 您可以通过降低经纪人强制fsync到光盘(强调文本和/或flush.ms)的间隔来进一步降低数据丢失的风险,但请注意,这些值会带来严重的性能损失 .

    除了这些设置之外,您还需要等待Kafka 生产环境 者将请求的响应返回给您,并检查是否发生了异常 . 这与你问题的第二部分有关,所以我将进一步深入研究 . 如果响应是干净的,您可以确定您的数据到达Kafka并开始担心MariaDB .

    到目前为止我们所涵盖的所有内容仅涉及如何确保Kafka收到您的消息,但您还需要将数据写入MariaDB,这也可能会失败,这将使您有必要回忆一下您可能已发送给Kafka的消息 - 这是你不能做到的 .

    所以基本上你需要选择一个系统,你可以更好地处理重复/缺失值(取决于你是否重新发送部分失败),这将影响你做事的顺序 .

    Option 1

    Kafka first

    在此选项中,您在MariaDB中初始化事务,然后将消息发送到Kafka,等待响应,如果发送成功,则在MariaDB中提交事务 . 如果向Kafka发送失败,您可以在MariaDB中回滚您的交易,一切都很花哨 . 但是,如果发送到Kafka成功并且由于某种原因您对MariaDB的提交失败,则无法从Kafka获取消息 . 因此,如果您稍后重新发送所有内容,您将在MariaDB中丢失消息或在Kafka中发送重复消息 .

    Option 2

    MariaDB first

    这几乎就是另一种方式,但你可能更好能够删除使用MariaDB编写的消息,具体取决于您的数据模型 .

    当然,您可以通过跟踪失败的发送并稍后重试这两种方法来缓解这两种方法,但所有这些都是更大问题上的绑定 .

    我个人会采用方法1,因为提交失败的可能性应该小于发送本身,并在 Kafka 的另一边实施某种欺骗检查 .


    这与前一个相关:我发送消息:orderSource.output() . send(MessageBuilder.withPayload(order).build());无论Kafka代理是否已关闭,此操作都是异步的,并且总是返回true . 我如何知道该消息已经到达Kafka经纪人?

    首先,我承认我不熟悉Spring,所以这可能对您没用,但下面的代码片段说明了检查异常的产生响应的一种方法 . 通过调用flush阻止,直到所有发送完成(并且失败或成功),然后检查结果 .

    Producer<String, String> producer = new KafkaProducer<>(myConfig);
    final ArrayList<Exception> exceptionList = new ArrayList<>();
    
    for(MessageType message : messages){
      producer.send(new ProducerRecord<String, String>("myTopic", message.getKey(), message.getValue()), new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
          if (exception != null) {
            exceptionList.add(exception);
          }
        }
      });
    }
    
    producer.flush();
    
    if (!exceptionList.isEmpty()) {
      // do stuff
    }
    
  • 9

    我认为实现事件源的正确方法是直接从一个插件推送的事件填充Kafka,该插件从RDBMS binlog读取,例如使用Confluent BottledWater(https://www.confluent.io/blog/bottled-water-real-time-integration-of-postgresql-and-kafka/)或更活跃的Debezium(http://debezium.io/) . 然后,消费微服务可以监听这些事件,使用它们并在各自的数据库上执行操作,最终与RDBMS数据库保持一致 .

    看看我的完整答案的指南:https://stackoverflow.com/a/43607887/986160

相关问题