首页 文章

Kapacitor:通过连接计算两个流之间的差异

提问于
浏览
1

完全披露:我也发表了这个问题的变体here .

我有一个嵌入式设备作为加热系统的一部分,每隔5秒通过一个mosquitto MQTT代理发布两个温度值,每个温度值到一个单独的MQTT主题 . "mydevice/sensor1"是预热温度,"mydevice/sensor2"是后加热温度 . 这些值几乎同时发布,因此's typically never more than half a second of delay between the two messages - but they aren't完全同步 .

Telegraf订阅了同一个经纪人,并乐于将这些测量结果放入名为“telegraf.autogen”的InfluxDB数据库中 . 这些测量值都出现在名为“mqtt_consumer”的单个测量值下,其中一个字段称为“值” . 在InfluxDB中,我可以通过使用“topic”标记进行过滤来区分主题标记值:

SELECT mean("value") AS "mean_value" FROM "telegraf"."autogen"."mqtt_consumer" WHERE time > now() - 1m AND "topic"='mydevice/sensor1' GROUP BY time(5s)

这一切似乎都正常工作 .

我想要做的是为每对输入值计算这两个主题值之间的 difference ,以便计算温差并最终计算加热系统传输的能量(流速恒定且已知) . 我尝试用Grafana中的InfluxDB查询来做这个,但是看起来很困难(我失败了),所以我想我会尝试使用TICKscript将我的过程分解成小步骤 .

我一直在整理一个TICKscript来计算基于这个例子的差异:

https://docs.influxdata.com/kapacitor/v1.3/guides/join_backfill/#stream-method

然而,在我来说,我使用 httpOut 生成用于调试(撇开查看Don 't have two separate measurements. Instead, I create two separate streams from the single 1835297 measurement, using the topic tag as a filter. Then I attempt to join these with a 1s tolerance (values are always published close enough in time). I' L:这只是更新每隔10秒,缺少每一秒值,即使我流在5秒的时间间隔运行 - 这是为什么我可以看到?新的数据库,但值都存在) .

一旦我将它们加入,我将评估值的差异,并将其存储在名为“diff”的测量下的新数据库中 .

到目前为止这是我的脚本:

var sensor1 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor1')
        .groupBy(*)
    |httpOut('sensor1')

var sensor2 = stream
    |from()
        .database('telegraf')
        .retentionPolicy('autogen')
        .measurement('mqtt_consumer')
        .where(lambda: "topic" == 'mydevice/sensor2')
        .groupBy(*)
    |httpOut('sensor2')

sensor1
    |join(sensor2)
        .as('value1', 'value2')
        .tolerance(1s)
    |httpOut('join')
    |eval(lambda: "sensor1.value1" - "sensor1.value2")
        .as('diff')
    |httpOut('diff')
    |influxDBOut()
        .create()
        .database('mydb')
        .retentionPolicy('myrp')
        .measurement('diff')

不幸的是,我的脚本无法通过 join 节点传递任何项目 . 在 kapacitor show 中,我可以看到 httpOut 节点都将项目传递给 join 节点,但它也没有显示任何明显的节点 . httpOut('join') 的HTTP GET返回:

{"series":null}

我有两个问题:

  • 是这种方法,使用Kapacitor和TICKscript根据单个测量中两个值之间的差异计算能量,有效吗?或者有更好/更简单的方法吗?

  • 为什么 join 节点不产生任何输出?我该怎么做才能进一步调试?

1 回答

  • 0

    尝试在两个传感器中添加| mean node,以计算字段的平均值:

    var sensor1 = stream
        |from()
            .database('telegraf')
            .retentionPolicy('autogen')
            .measurement('mqtt_consumer')
            .where(lambda: "topic" == 'mydevice/sensor1')
            .groupBy(*)
        |mean('field1')
        |httpOut('sensor1')
    

    在连接之后,您应该将新分配的名称用于流,也不应使用原始名称:

    sensor1
        |join(sensor2)
            .as('value1', 'value2')
            .tolerance(1s)
        |httpOut('join')
        |eval(lambda: "value1.field1" - "value2.field2")
            .as('diff')
        |httpOut('diff')
        |influxDBOut()
            .create()
            .database('mydb')
            .retentionPolicy('myrp')
            .measurement('diff')
    

    其中平均字段是根据我之前的评论计算的字段 . 试试看!

    此外,要进一步调试,请尝试将日志节点添加到您想要关注的位置 .

    希望这可以帮助!问候

相关问题