以下是我们的 IoT 平台中的入站消息流:
Device ---(MQTT)---> RabbitMQ Broker ---(AMQP)---> Apache Storm ---> Kafka
我希望实现一个有效 limits/throttles 每秒发布到 Kafka 的数据量的解决方案。
当前的策略使用 Guava 的 RateLimiter,其中每个设备都有自己的本地缓存实例。收到设备消息时,将从缓存中获取映射到该 deviceId 的 RateLimiter,并调用tryAquire()
方法。如果成功获得许可证,则会像往常一样将元组转发给 Kafka,否则会超出配额并且会默默地丢弃消息。这种方法相当麻烦,并且在某些时候注定要失败或成为瓶颈。
我一直在阅读 Kafka 的 byte-rate 配额,并相信这在我们的情况下会完美运行,特别是因为 Kafka 客户端可以动态配置。在我们的平台中创建虚拟设备时,应在client.id == deviceId
处添加新的 client.id。
我们假设以下用例为例:
-
管理员创建 2 个虚拟设备:湿度和温度传感器
-
触发规则以在 Kafka 中为上述设备创建新的 user/clientId 条目
-
通过 Kafka CLI 设置他们的生产者配额值
-
两个设备都发出入站事件消息
-
...?
这是我的问题。如果使用单个 Producer 实例,是否可以在调用send()
之前在 ProducerRecord 或 Producer 中的某个位置指定client.id
?如果生产者只允许一个client.id
,这是否意味着每个设备必须有自己的生产者?如果只允许 one-to-one 映射,那么为每个设备缓存可能有数百个(如果不是数千个)生产者实例是明智的吗?还有一种我不知道的更好的方法吗?
注意:我们的平台是一个“开门系统”,意味着客户永远不会收到错误响应,例如“超出费率”或任何错误。这对最终用户来说都是透明的。出于这个原因,我不能干扰 RabbitMQ 中的数据或 re-route 消息到不同的队列..我唯一的选择是集成这些东西位于 Storm 或 Kafka 之间。
2 回答
虽然您可以在
Producer
对象上指定client.id
,但请记住它们是重量级的,并且您可能不愿意为它们创建多个实例(特别是在 one-per-device 基础上)。关于减少
Producer
的数量,你考虑过在 per-user 而不是 per-device 基础上创建一个,或者甚至有一个有限的共享池吗?然后可以使用 Kafka 消息头来辨别实际生成数据的设备。缺点是您需要限制消息生成,以便一个设备不会从其他设备获取所有资源。但是,您可以限制 Kafka 代理端的用户,并将配置应用于默认 user/client:
有关更多示例和深入解释,请参阅https://kafka.apache.org/documentation/#design_quotas。
如何识别消息取决于您的架构,可能的解决方案包括:
每个用户的主题/分区(e.g.
data-USERABCDEF
)如果您决定使用常见主题,那么您可以将生产者数据放入消息标题 - https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/common/header/Headers.html,或者您可以将它们放入有效负载本身
您可以按应用程序配置
client.id
:properties.put ("client.id", "humidity")
或properties.put ("client.id", "temp")
根据每个client.id
您可以设置值怀疑我与此配置(
producer_byte_rate = 1024, consumer_byte_rate = 2048, request_percentage = 200
)有关,生产者不承担插入的配置,因为消费者正常工作