首页 文章

Kafka KStream - 使用带窗口的AbstractProcessor

提问于
浏览
1

我希望将来自KStream的窗口批输出组合在一起并将它们写入辅助存储 .

我期待看到 .punctuate() 大约每30秒调用一次 . 我得到的是保存here .

(原始文件长达数千行)

总结 - .punctuate() 似乎随机地被调用,然后反复调用 . 它似乎不符合通过ProcessorContext.schedule()设置的值 .


编辑:

另一次运行相同的代码大约每四分钟调用 .punctuate() . 这次我没有看到疯狂的重复值 . 来源没有变化 - 只是结果不同 .

使用以下代码:

主要

StreamsConfig streamsConfig = new StreamsConfig(config);
KStreamBuilder kStreamBuilder = new KStreamBuilder();
KStream<String, String> lines = kStreamBuilder.stream(TOPIC);

lines.process(new BPS2());

KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, streamsConfig);

kafkaStreams.start();

处理器

public class BP2 extends AbstractProcessor<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BP2.class);

    private ProcessorContext context;
    private final long delay;
    private final ArrayList<String> values;

    public BP2(long delay) {
        LOGGER.debug("BatchProcessor() constructor");
        this.delay = delay;

       values = new ArrayList<>();

    }

    @Override
    public void process(String s, String s2) {
        LOGGER.debug("batched processor s:{}   s2:{}", s, s2);

        values.add(s2);
    }

    @Override
    public void init(ProcessorContext context) {
        LOGGER.info("init");

        super.init(context);

        values.clear();

        this.context = context;
        context.schedule(delay);
    }

    @Override
    public void punctuate(long timestamp) {
        super.punctuate(timestamp);

        LOGGER.info("punctuate   ts: {}   count: {}", timestamp, values.size());

        context().commit();
    }
}

ProcessorSupplier

public class BPS2 implements ProcessorSupplier<String, String> {
    private static final Logger LOGGER = LoggerFactory.getLogger(BPS2.class);

    @Override
    public Processor<String, String> get() {
        try {
            return new BP2(30000);
        } catch(Exception exception) {
            LOGGER.error("Unable to instantiate BatchProcessor()", exception);
            throw new RuntimeException();
        }
    }
}

编辑:

为了确保我的调试器没有't slowing this down I built it and ran it on the same box as my kafka process. This time it didn'甚至尝试滞后4分钟或更长时间 - 在几秒钟内它就输出了对 .punctuate() 的虚假调用 . 其中许多(大多数)没有干预电话 .process() .

3 回答

  • 0

    Update: this part of the answer is for Kafka version 0.11 or earlier (for Kafka 1.0 and later see below)

    在Kafka Streams中,标点符号基于 stream-timenot 系统时间(也称为处理时间) .

    默认 stream-timeevent-time ,即Kafka记录中嵌入的时间戳 . 由于您未设置非默认 TimestampExtractor (请参阅http://docs.confluent.io/current/streams/developer-guide.html#optional-configuration-parameters中的 timestamp.extractor ),因此对 punctuate 的调用仅取决于您处理的记录的事件时间过程 . 因此,如果您需要多分钟来处理记录的"30 seconds"(事件时间), punctuate 将被频繁调用,而不是30秒(挂钟时间)......

    这也可以解释您的不规则呼叫模式(即突发和长延迟) . 如果您的数据事件时间确实"jump",并且您要处理的数据已在您的主题中完全可用,则Kafka Streams也会"jumps"关于内部维护的 stream-time .

    我认为,您可以使用 WallclockTimestampExtractor 来解决您的问题(请参阅http://docs.confluent.io/current/streams/developer-guide.html#timestamp-extractor

    还有一点需要提及: stream-time 仅在处理数据时才会提前 - 如果您的应用程序到达输入主题的末尾并等待数据,则不会调用 punctuate . 即使您使用 WallclockTimestampExtractor ,这也适用 .

    顺便说一下:目前有关于Streams标点符号行为的讨论:https://github.com/apache/kafka/pull/1689

    Answer for Kafka 1.0 and later

    从Kafka 1.0开始,可以根据挂钟时间或事件时间注册标点符号:https://kafka.apache.org/10/documentation/streams/developer-guide/processor-api.html#id2

  • 0

    刚读完了this question的答案,我认为这也是你的答案 . 它的要点是:

    • 流消费者对记录执行轮询

    • 完全处理所有返回的记录 .

    • 然后使用配置的延迟安排标点回调 .

    指出标点符号不是固定的时间间隔事件,并且#2采用的时间的变化将导致标点符号的执行周期的等效变化 .

    ....但是读了那个链接,他说它比我好 .

  • 6

    好的 - 我认为这是 Kafka 的一个错误 .

    原因如下:

    在我原来的测试中,我使用一台机器来运行 ProducerConsumer . 我会运行Producer几分钟来生成一些测试数据,然后运行我的测试 . 这会给我最初发布的奇怪输出 .

    然后我决定将Producer推到后台并让它继续运行 . 现在,在 .punctuate() 的调用之间,我看到100%完美的30秒间隔 . 没有更多的问题 .

    换句话说 - 如果kafka服务器不是't processing any inbound data then it doesn' t似乎与运行 KStreams 进程一致 .

相关问题