首页 文章

如何每小时汇总数据?

提问于
浏览
2

每当用户在我们网站上收集某些内容时,我们都会收集事件,而我们计划要做的是每小时提交内容的聚合收藏并更新数据库中的最喜欢的数量 .

我们正在评估Kafka Streams . 遵循单词计数示例 . 我们的拓扑很简单,生成主题A并读取聚合数据并将其提交给另一个主题B.然后每小时使用主题B中的事件并在数据库中提交 .

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
   public StreamsConfig kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "favorite-streams");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokerAddress);
    return new StreamsConfig(props);
}

@Bean
public KStream<String, String> kStream(StreamsBuilder kStreamBuilder) {
    StreamsBuilder builder = streamBuilder();
    KStream<String, String> source = builder.stream(topic);
    source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
            .groupBy((key, value) -> value)
            .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store")).toStream()
            .to(topic + "-grouped", Produced.with(Serdes.String(), Serdes.Long()));

    Topology topology = builder.build();
    KafkaStreams streams = new KafkaStreams(topology, kStreamsConfigs());
    streams.start();
    return source;
}

@Bean
public StreamsBuilder streamBuilder() {
    return new StreamsBuilder();
}

但是当我使用这个主题B时,它从头开始给我汇总的数据 . 我的问题是,我们可以有一些条款,其中我可以使用前几个小时的分组数据,然后提交到DB,然后Kakfa忘记前几个小时的数据并每小时提供新数据而不是累积总和 . 设计拓扑是正确的还是我们可以做得更好?

1 回答

相关问题