我正在使用dotnet和ksql进行PoC . https://github.com/pablocastilla/kafkiano/
总的想法是看看我是否可以使用KSQL实现业务逻辑 . 在示例中,我介绍了库存中的设备并从中发出订单 . 这个例子包括:
两个主要流:
-
库存流接收库存的添加事件 .
-
订单流接收产品订单 .
使用这些流我创建了两个表:
-
产品库存:它只是将产品添加到库存中
-
订单:按产品计算订单
在这两个表之后,我创建了另一个表,其中包含订单和库存中的产品之间的差异,只是为了知道是否还有产品 .
通过最后一个表和订单流的连接,我可以在处理该订单时留下库存 .
我使用productname作为密钥来介绍事件 . 到目前为止,它在我的机器上运行良好,但我的问题是:
-
这在大型 生产环境 环境中是否一致?我想知道在并行接收大量事件时何时破坏一致性的限制 .
-
如何知道哪些查询先于其他查询执行?在我加入与订单流的差异之前,我需要计算库存和订单之间的差异
谢谢
KSQL:
//INVENTORY STREAMS
CREATE STREAM InventoryEventsStream (ProductName VARCHAR, Quantity INT) WITH (kafka_topic='INVENTORYEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsStock as select ProductName,sum(Quantity) as Stock from InventoryEventsStream group by ProductName;
// ORDERS STREAMS
CREATE STREAM OrdersCreatedStream (ProductName VARCHAR,Quantity INT, OrderId VARCHAR, User VARCHAR) WITH (kafka_topic='ORDERSEVENTS', key='ProductName', value_format='json');
//TABLE GROUPING BY PRODUCT
CREATE TABLE ProductsOrdered as select ProductName as ProductName,sum(Quantity) as Orders from ORDERSCREATEDSTREAM group by ProductName;
// join with the difference
CREATE TABLE StockByProductTable AS SELECT ps.ProductName as ProductName,ps.Stock - op.Orders as Stock FROM PRODUCTSORDERED op JOIN ProductsStock ps ON op.ProductName = ps.ProductName;
//logic: I want the stock left when I make an order
SELECT ocs.OrderId,ocs.User,sbpt.Stock FROM OrdersCreatedStream ocs JOIN StockByProductTable sbpt ON sbpt.ProductName = ocs.ProductName;