首页 文章

如何重新处理批量Kafka流

提问于
浏览
2
  • 我想根据创建消息的时间戳批处理消息 .

  • 此外,我想在固定时间窗口(1分钟)内批量处理这些消息 .

  • 只有在窗口通过后,才能向下游推送批次 .

为了实现这一点,处理器API似乎或多或少适合(la KStream batch process windows):

public void process(String key, SensorData sensorData) {
    //epochTime is rounded to our prefered time window (1 minute)
    long epochTime = Sensordata.epochTime;

    ArrayList<SensorData> data = messageStore.get(epochTime);
    if (data == null) {
        data = new ArrayList<SensorData>();
    }

    data.add(sensorData);
    messageStore.put(id, data);
    this.context.commit();
}

@Override
public void init(ProcessorContext context) {
    this.context = context;
    this.context.schedule(60000); // equal to 1 minute
}

@Override
public void punctuate(long streamTime) {
    KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
    while (it.hasNext()) {
        KeyValue<String, ArrayList<SensorData>> entry = it.next();
        this.context.forward(entry.key, entry.value);
    }
    //reset messageStore
}

但是,这种方法有一个主要缺点:我们不使用Kafka Streams窗口 .

不考虑

  • 无序消息 .

  • 当实时操作时,标点符号表应等于所需的批次时间窗口 . 如果我们将其设置为short,则将转发批处理并且下游计算将快速开始 . 如果设置为long,并且批处理窗口尚未完成时触发标点符号,则同样存在问题 .

  • ,在保持标点符号计划(1分钟)的同时重放历史数据将仅在1分钟后触发第一次计算 . 如果是这样,那会炸毁国营商店,也会感觉不对劲 .

考虑到这些要点,我应该使用Kafka Streams窗口 . 但这只能在Kafka Streams DSL中实现......

任何艰难的事情都会很棒 .

1 回答

  • 1

    你可以在DSL中使用 process()transform()transformValues() 混合搭配DSL和处理器API(还有其他一些关于这个的问题,所以我没有进一步详述) . 因此,您可以将常规窗口构造与自定义(下游)运算符结合使用,以保留结果(并进行重复数据删除) . 一些duduplication将在你的window-operator中自动发生(从Kafka 0.10.1 开始;参见http://docs.confluent.io/current/streams/developer-guide.html#memory-management)但是如果你想要只有一个结果,那么缓存将不会为你做 .

    关于标点符号:它是基于进度(即流时间)触发的,而不是基于挂钟时间 - 因此,如果您重新处理旧数据,将调用与原始运行完全相同的次数(仅如果你在更快地处理旧数据时考虑挂钟时间,那么彼此之后会更快 . 如果你想获得更多细节,还有一些关于这个的问题 .

    但是,我一般考虑:为什么你只需要一个结果?如果进行流处理,您可能希望构建下游使用者应用程序,以便能够处理结果的更新 . 这是Kafka的固有设计:使用更改日志 .

相关问题