首页 文章

直接写到 Kafka 国营商店

提问于
浏览
1

我们已经开始尝试使用Kafka来查看它是否可以用于聚合我们的应用程序数据 . 我认为我们的用例与Kafka流匹配,但我们不确定我们是否正确使用该工具 . 我们构建的概念证明似乎按设计工作,我不确定我们是否正在使用API .

我们的概念证明是使用kafka流来保持输出主题中关于程序的信息的运行记录,例如,

{ 
  "numberActive": 0, 
  "numberInactive": 0, 
  "lastLogin": "01-01-1970T00:00:00Z"  
}

计算计数很简单,它基本上是根据输入主题和输出字段执行比较和交换(CAS)操作 .

本地状态包含给定密钥的最新程序 . 我们加入一个针对状态存储的输入流,并使用TransformSupplier运行CAS操作,该转换器显式地使用TransformSupplier将数据写入状态存储

context.put(...)
context.commit();

这是否适合使用当地的国营商店?还有另一种方法可以在主题中保持有状态的运行记录吗?

1 回答

  • 3

    您的设计听起来对我(我假设您使用的是PAPI而不是Streams DSL),您正在一个流中读取,在状态存储与运营商关联的流上调用transform() . 由于您的更新逻辑似乎只是依赖于密钥,因此可以通过基于密钥分区的Streams库进行令人尴尬的并行化 .

    有一点需要注意的是,在每次看跌期权之后,您似乎都在调用"context.commit()",这不是推荐的模式 . 这是因为 commit() 操作是一个非常繁重的调用,涉及刷新状态存储,向Kafka代理发送提交偏移请求等,在每次调用时调用它将导致非常低的吞吐量 . 建议仅在处理一堆记录后才调用commit(),或者您可以依赖Streams config "commit.interval.ms"依赖Streams库在每个时间间隔之后仅在内部调用commit() . 请注意,这不会影响正常关闭时的处理语义,因为在关闭时Streams将始终强制执行commit()调用 .

相关问题