我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储 .
我期待看到 .punctuate()
大约每30秒调用一次 . 我得到的是保存here .
(原始文件长达数千行)
总结 - .punctuate()
似乎随机地被调用,然后反复调用 . 它似乎不符合通过ProcessorContext.schedule()设置的值 .
编辑:
另一次运行相同的代码大约每四分钟调用 .punctuate()
. 这次我没有看到疯狂的重复值 . 来源没有变化 - 只是结果不同 .
使用以下代码:
主要
StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);
lines.process(new BPS2());
KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);
kafkaStreams.start();
处理器
public class BP2 extends AbstractProcessor<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);
private ProcessorContext context;
private final long delay;
private final ArrayList<String> values;
public BP2(long delay) {
LOGGER.debug("BatchProcessor() constructor");
this.delay = delay;
values = new ArrayList<>();
}
@Override
public void process(String s, String s2) {
LOGGER.debug("batched processor s:{} s2:{}", s, s2);
values.add(s2);
}
@Override
public void init(ProcessorContext context) {
LOGGER.info("init");
super.init(context);
values.clear();
this.context = context;
context.schedule(delay);
}
@Override
public void punctuate(long timestamp) {
super.punctuate(timestamp);
LOGGER.info("punctuate ts: {} count: {}", timestamp, values.size());
context().commit();
}
}
ProcessorSupplier
public class BPS2 implements ProcessorSupplier<String, String> {
private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);
@Override
public Processor<String, String> get() {
try {
return new BP2(30000);
} catch(Exception exception) {
LOGGER.error("Unable to instantiate BatchProcessor()", exception);
throw new RuntimeException();
}
}
}
编辑:
为了确保我的调试器没有't slowing this down I built it and ran it on the same box as my kafka process. This time it didn'甚至尝试滞后4分钟或更长时间 - 在几秒钟内它就输出了对 .punctuate()
的虚假调用 . 其中许多(大多数)没有干预电话 .process()
.
3 回答
Update: this part of the answer is for Kafka version 0.11 or earlier (for Kafka 1.0 and later see below)
在Kafka Streams中,标点符号基于 stream-time 和 not 系统时间(也称为处理时间) .
默认 stream-time 是 event-time ,即Kafka记录中嵌入的时间戳 . 由于您未设置非默认
TimestampExtractor
(请参阅http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters中的timestamp.extractor
),因此对punctuate
的调用仅取决于您处理的记录的事件时间过程 . 因此,如果您需要多分钟来处理记录的"30 seconds"(事件时间),punctuate
将被频繁调用,而不是30秒(挂钟时间)......这也可以解释您的不规则呼叫模式(即突发和长延迟) . 如果您的数据事件时间确实"jump",并且您要处理的数据已在您的主题中完全可用,则Kafka Streams也会"jumps"关于内部维护的 stream-time .
我认为,您可以使用
WallclockTimestampExtractor
来解决您的问题(请参阅http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor)还有一点需要提及: stream-time 仅在处理数据时才会提前 - 如果您的应用程序到达输入主题的末尾并等待数据,则不会调用
punctuate
. 即使您使用WallclockTimestampExtractor
,这也适用 .顺便说一下:目前有关于Streams标点符号行为的讨论:https://github.com/apache/kafka/pull/1689
Answer for Kafka 1.0 and later
从Kafka 1.0开始,可以根据挂钟时间或事件时间注册标点符号:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2
刚读完了this question的答案,我认为这也是你的答案 . 它的要点是:
流消费者对记录执行轮询
完全处理所有返回的记录 .
然后使用配置的延迟安排标点回调 .
指出标点符号不是固定的时间间隔事件,并且#2采用的时间的变化将导致标点符号的执行周期的等效变化 .
....但是读了那个链接,他说它比我好 .
好的 - 我认为这是 Kafka 的一个错误 .
原因如下:
在我原来的测试中,我使用一台机器来运行 Producer 和 Consumer . 我会运行Producer几分钟来生成一些测试数据,然后运行我的测试 . 这会给我最初发布的奇怪输出 .
然后我决定将Producer推到后台并让它继续运行 . 现在,在
.punctuate()
的调用之间,我看到100%完美的30秒间隔 . 没有更多的问题 .换句话说 - 如果kafka服务器不是't processing any inbound data then it doesn' t似乎与运行 KStreams 进程一致 .