首页 文章

暗淡的kafka stream groupBy和window

提问于
浏览
0

我无法理解groupBy / groupById的概念和kafka流中的窗口化 . 我的目标是在一段时间(例如5秒)内聚合流数据 . 我的流媒体数据类似于:

{"value":0,"time":1533875665509}
{"value":10,"time":1533875667511}
{"value":8,"time":1533875669512}

时间以毫秒(纪元)为单位 . 在这里,我的时间戳在我的消息中,而不是在密钥中 . 我想平均5秒窗口的值 .

这是我正在尝试的代码,但似乎我无法让它工作

builder.<String, String>stream("my_topic")
   .map((key, val) -> { TimeVal tv = TimeVal.fromJson(val); return new KeyValue<Long, Double>(tv.time, tv.value);})
   .groupByKey(Serialized.with(Serdes.Long(), Serdes.Double()))
   .windowedBy(TimeWindows.of(5000))
   .count()
   .toStream()
   .foreach((key, val) -> System.out.println(key + " " + val));

即使主题每两秒生成一次消息,此代码也不会打印任何内容 . 当我按下Ctrl C然后打印出类似的东西

[1533877059029@1533877055000/1533877060000] 1
[1533877061031@1533877060000/1533877065000] 1
[1533877063034@1533877060000/1533877065000] 1
[1533877065035@1533877065000/1533877070000] 1
[1533877067039@1533877065000/1533877070000] 1

这个输出对我来说没有意义 .

相关代码:

public class MessageTimeExtractor implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record,  long previousTimestamp) {
        String str = (String)record.value();
        TimeVal tv = TimeVal.fromJson(str);
        return tv.time;
    }
}

public class TimeVal
{
    final public long time;
    final public double value;
    public TimeVal(long tm, double val) {
        this.time = tm;
        this.value = val;
    }
   public static TimeVal fromJson(String val) {
       Gson gson = new GsonBuilder().create();
       TimeVal tv = gson.fromJson(val, TimeVal.class);
       return tv;
   }
}

问题:

为什么需要将serializer / deserializer传递给group by . 一些重载也采用ValueStore,那是什么?分组后,数据在分组流中的显示方式?

窗口流如何与组流相关?

以上,我期待以流媒体方式打印 . 这意味着每5秒缓冲一次,然后计数然后打印 . 它只打印一次在命令提示符下按Ctrl键,即打印然后退出

2 回答

  • 0

    看起来你误解了窗口DSL的本质 .

    它适用于kafka平台处理的内部消息时间戳,而不适用于编码时间信息的特定消息类型中的任意属性 . 此外,此窗口不会分组为间隔 - 它是一个滑动窗口 . 这意味着您获得的任何聚合都是当前消息之前的最后5秒 .

    此外,您需要将所有组元素的相同键组合到同一组中,例如 null . 在您的示例中, key 是一个时间戳,它是一种条目唯一的,因此组中只有一个元素 .

  • 1

    您的输入数据中似乎没有密钥(如果这是错误的,请更正我),而且您还想进行全局聚合?

    通常,分组用于将流分成子流 . 这些子流由密钥构建(即,每个密钥一个逻辑子流) . 您将时间戳设置为代码段中的键,从而为每个时间戳生成一个子流 . 我认为这不是故意的 .

    如果要进行全局聚合,则需要将所有记录映射到单个子流,即将相同的键分配给 groupBy() 中的所有记录 . 请注意,全局聚合不会扩展,因为聚合必须由单个线程计算 . 因此,这仅适用于小型工作负载 .

    窗口化应用于每个生成的子流以构建窗口,并且每个窗口计算聚合 . 窗口是基于 Timestamp 提取器返回的时间戳构建的 . 您似乎已经有一个实现,它已经为此目的提取值的时间戳 .

    即使主题每两秒生成一次消息,此代码也不会打印任何内容 . 当我按下Ctrl C然后打印出类似的东西

    默认情况下,Kafka Streams使用一些内部缓存,缓存将在提交时刷新 - 默认情况下每30秒发生一次,或者当您停止应用程序时 . 您需要禁用缓存以更早地查看结果(参见https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html

    为什么需要将serializer / deserializer传递给group by .

    因为需要重新分配数据,这是通过Kafka中的主题实现的 . 请注意,Kafka Streams是为分布式设置构建的,同一应用程序的多个实例并行运行以横向扩展 .

    顺便说一句:我们在这篇关于Kafka Streams执行模型的博文中也可能会感兴趣:https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/

相关问题