首页 文章

事件采购 - Apache Kafka Kafka Streams - 如何确保原子性/交易性

提问于
浏览
1

我使用Apache Kafka Streams评估事件采购,以了解复杂场景的可行性 . 与关系数据库一样,我遇到过一些案例,原子性/事务性是必不可少的:

购物应用程序有两个服务:

  • OrderService :有一个带有订单的Kafka Streams商店(OrdersStore)

  • ProductService :有一个Kafka Streams商店(ProductStockStore),其中包含产品及其库存 .

流:

  • OrderService发布OrderCreated事件(带有productId,orderId,userId信息)

  • ProductService获取OrderCreated事件并查询其KafkaStreams Store(ProductStockStore)以检查产品是否有库存 . 如果有库存,它会发布OrderUpdated事件(也包含productId,orderId,userId info)

关键是这个事件将由ProductService Kafka Stream监听,它会处理它以减少库存,到目前为止一直很好 .

但是,想象一下:

  • 客户1下订单,order1(产品库存为1)

  • 客户2同时为同一产品(库存仍为1)另外下订单order2

  • ProductService处理order1并发送消息OrderUpdated以减少库存 . 此消息放在order2 - > OrderCreated之后的主题中

  • ProductService处理order2-OrderCreated并发送消息OrderUpdated以再次减少库存 . 这是不正确的,因为它会引入不一致(库存现在应为0) .

显而易见的问题是,当我们处理第一个OrderUpdated事件时,应该直接更新我们的物化视图(商店) . 然而,更新Kafka Stream Store的唯一方法(我知道)是发布另一个由Kafka Stream处理的事件(OrderUpdated) . 这样我们就无法以事务方式执行此更新 .

我希望能够处理这样的场景 .

UPDATE: 我会试着澄清问题的有问题:

ProductService 有一个Kafka Streams商店, ProductStock 有这个股票 (productId=1, quantity=1)

OrderServiceorders topic 上发布了两个 OrderPlaced 事件:

  • Event1 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")

  • Event2 (key=product1, productId=product1, quantity=1, eventType="OrderPlaced")

ProductService 在订单主题上有一个消费者 . 为简单起见,我们假设 single partition 按顺序确保消息消耗 . 此使用者执行以下逻辑:

if("OrderPlaced".equals(event.get("eventType"))){

    Order order = new Order();
    order.setId((String)event.get("orderId"));
    order.setProductId((Integer)(event.get("productId")));
    order.setUid(event.get("uid").toString());

    // QUERY PRODUCTSTOCK TO CHECK AVAILABILITY
    Integer productStock = getProductStock(order.getProductId());

    if(productStock > 0) {
        Map<String, Object> event = new HashMap<>();
        event.put("name", "ProductReserved");
        event.put("orderId", order.getId());
        event.put("productId", order.getProductId());

        // WRITES A PRODUCT RESERVED EVENT TO orders topic
        orderProcessor.output().send(MessageBuilder.withPayload(event).build(), 500);
    }else{
        //XXX CANCEL ORDER
    }
}

ProductService 还有一个负责更新股票的Kafka Streams处理器:

KStream<Integer, JsonNode> stream = kStreamBuilder.stream(integerSerde, jsonSerde, "orders");
stream.xxx().yyy(() -> {...}, "ProductsStock");

将首先处理 Event1 ,因为仍有1个可用产品,它将生成 ProductReserved 事件 .

现在,轮到_616349了 . 如果由 ProductService consumer 消耗 ProductService Kafka Streams Processor 处理由 Event1 生成的ProductReseved事件,则消费者仍会看到product1的ProductStore库存为1,从而为Event2生成ProductReserved事件,然后在系统中产生不一致 .

2 回答

  • 2

    对于你原来的问题,这个答案有点晚了,但是我还是要回答完整性 .

    有很多方法可以解决这个问题,但我鼓励解决这个问题是一种事件驱动的方式 . 这意味着您(a)验证有足够的库存来处理订单,以及(b)将库存保留为单一库存,所有这些都在一个KStreams操作中 . 诀窍是通过productId重新生成密钥,这样您就知道同一产品的订单将在同一个线程上顺序执行(因此您无法进入Order1和Order2两次保留同一产品库存的情况) .

    有一篇帖子讨论了如何做到这一点:https://www.confluent.io/blog/building-a-microservices-ecosystem-with-kafka-streams-and-ksql/

    也许更有用的是有一些示例代码也显示了如何完成它:https://github.com/confluentinc/kafka-streams-examples/blob/1cbcaddd85457b39ee6e9050164dc619b08e9e7d/src/main/java/io/confluent/examples/streams/microservices/InventoryService.java#L76

    请注意,在此KStreams代码中,第一行如何重新生成productId,然后使用Transformer来(a)验证是否有足够的库存来处理订单 and (b)通过更新状态存储来保留所需的库存 . 这是使用Kafka的交易功能以原子方式完成的 .

  • 2

    同样的问题在确保任何分布式系统的一致性方面是典型的 . 通常使用流程管理器/传奇模式,而不是强烈的一致性 . 这有点类似于分布式事务中的两阶段提交,但在应用程序代码中明确实现 . 它是这样的:

    订购服务要求产品服务部门保留N个项目 . 产品服务接受命令并减少库存或拒绝该命令(如果没有足够的可用项目) . 在对命令作出肯定回复后,Order Service现在可以发出OrderCreated事件(虽然我称之为OrderPlaced,因为“放置”声音模式是域惯用语,“created”更通用,但这是一个细节) . 产品服务要么侦听OrderPlaced事件,要么向其发送显式ConfirmResevation命令 . 或者,如果发生了其他事情(例如,未能清除资金),则可以发出适当的事件或明确发送CancelReservation命令到ProductService . 为了满足特殊情况,ProductService还可能有一个调度程序(在KafkaStreams标点符号中可以派上用场)取消在超时期限内未确认或中止的预订 .

    两种服务的编排和处理错误条件以及补偿操作(在这种情况下取消预留)的技术性可以直接在服务中处理,或者在显式的Process Manager组件中处理,以分离此责任 . 就个人而言,我会选择一个可以使用Kafka Streams Processor API实现的显式流程管理器 .

相关问题