我正在使用dotnet和ksql进行PoC . https://github.com/pablocastilla/kafkiano/

总的想法是看看我是否可以使用KSQL实现业务逻辑 . 在示例中,我介绍了库存中的设备并从中发出订单 . 这个例子包括:

两个主要流:

  • 库存流接收库存的添加事件 .

  • 订单流接收产品订单 .

使用这些流我创建了两个表:

  • 产品库存:它只是将产品添加到库存中

  • 订单:按产品计算订单

在这两个表之后,我创建了另一个表,其中包含订单和库存中的产品之间的差异,只是为了知道是否还有产品 .

通过最后一个表和订单流的连接,我可以在处理该订单时留下库存 .

我使用productname作为密钥来介绍事件 . 到目前为止,它在我的机器上运行良好,但我的问题是:

  • 这在大型 生产环境 环境中是否一致?我想知道在并行接收大量事件时何时破坏一致性的限制 .

  • 如何知道哪些查询先于其他查询执行?在我加入与订单流的差异之前,我需要计算库存和订单之间的差异

谢谢

KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');

//TABLE GROUPING BY PRODUCT
CREATE TABLE  ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;

// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from  ORDERSCREATEDSTREAM group by ProductName;

// join with the difference
CREATE TABLE StockByProductTable  AS  SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;

//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN  StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;