-
我想根据创建消息的时间戳批处理消息 .
-
此外,我想在固定时间窗口(1分钟)内批量处理这些消息 .
-
只有在窗口通过后,才能向下游推送批次 .
为了实现这一点,处理器API似乎或多或少适合(la KStream batch process windows):
public void process(String key, SensorData sensorData) {
//epochTime is rounded to our prefered time window (1 minute)
long epochTime = Sensordata.epochTime;
ArrayList<SensorData> data = messageStore.get(epochTime);
if (data == null) {
data = new ArrayList<SensorData>();
}
data.add(sensorData);
messageStore.put(id, data);
this.context.commit();
}
@Override
public void init(ProcessorContext context) {
this.context = context;
this.context.schedule(60000); // equal to 1 minute
}
@Override
public void punctuate(long streamTime) {
KeyValueIterator<String, ArrayList<SensorData>> it = messageStore.all();
while (it.hasNext()) {
KeyValue<String, ArrayList<SensorData>> entry = it.next();
this.context.forward(entry.key, entry.value);
}
//reset messageStore
}
但是,这种方法有一个主要缺点:我们不使用Kafka Streams窗口 .
不考虑
-
无序消息 .
-
当实时操作时,标点符号表应等于所需的批次时间窗口 . 如果我们将其设置为short,则将转发批处理并且下游计算将快速开始 . 如果设置为long,并且批处理窗口尚未完成时触发标点符号,则同样存在问题 .
-
,在保持标点符号计划(1分钟)的同时重放历史数据将仅在1分钟后触发第一次计算 . 如果是这样,那会炸毁国营商店,也会感觉不对劲 .
考虑到这些要点,我应该使用Kafka Streams窗口 . 但这只能在Kafka Streams DSL中实现......
任何艰难的事情都会很棒 .
1 回答
你可以在DSL中使用
process()
,transform()
或transformValues()
混合搭配DSL和处理器API(还有其他一些关于这个的问题,所以我没有进一步详述) . 因此,您可以将常规窗口构造与自定义(下游)运算符结合使用,以保留结果(并进行重复数据删除) . 一些duduplication将在你的window-operator中自动发生(从Kafka0.10.1
开始;参见http://docs.confluent.io/current/streams/developer-guide.html#memory-management)但是如果你想要只有一个结果,那么缓存将不会为你做 .关于标点符号:它是基于进度(即流时间)触发的,而不是基于挂钟时间 - 因此,如果您重新处理旧数据,将调用与原始运行完全相同的次数(仅如果你在更快地处理旧数据时考虑挂钟时间,那么彼此之后会更快 . 如果你想获得更多细节,还有一些关于这个的问题 .
但是,我一般考虑:为什么你只需要一个结果?如果进行流处理,您可能希望构建下游使用者应用程序,以便能够处理结果的更新 . 这是Kafka的固有设计:使用更改日志 .