我正在使用Kafka Streams API,我不确定我所看到的行为是否是一个错误或我无法正确理解的东西 .

上下文

我首先要说我是流处理的新手 . 我有一些当前批处理的数据,并希望将其提供给流媒体系统 . 需要注意的是,我现在无法直接向流集群添加事件 - 它们必须通过一个存储记录的中间系统并将它们加载到数据库中(这里,我将添加一个Kafka 生产环境 者) .

此外,中间系统是负载 balancer 器将循环的机器集群 . 这意味着如果其中一台机器最终出现问题,则可以延迟收到记录 . 由于延迟和排序问题,我想通过指定我自己的自定义时间戳提取器来使用事件时间语义 . 每条记录都有一个嵌入的时间戳 .

我正在使用Kafka 0.11.01 .

问题

基于实现窗口聚合函数和内置状态存储的简易性,我发现Streams DSL非常有吸引力 . 运行应用程序时,一切正常 . However, when restarting the application, the state of the aggregations seem to be lost - 如果计数,例如,计数将从0开始,而不是从最后一个值开始 . 我认为它与使用自定义时间戳和窗口化聚合函数有关,因为它在使用内置时间戳时按预期工作 .

示例应用程序(Java 8)

有一个按钮示例很难,所以这需要一些工作 . 运行它需要具有kafka控制台命令的dev kafka服务器 . 此应用程序只计算记录...每条记录的值是时间戳 . 我提到的行为可以通过以下方式重新生成:

  • 启动 生产环境 者并发布以下消息:"Test:1"

kafka-console-producer --broker-list 127.0.0.1:9092 --property“parse.key = true”--property“key.separator =:” - topic test-topic`

  • 启动使用者(注意:它使用输出主题,对记录进行计数) .

kafka-console-consumer --bootstrap-server 127.0.0.1:9092 --from-beginning --property print.key = true --property key.separator =“:” - value-deserializer org.apache.kafka . common.serialization.LongDeserializer --topic test-output

  • 运行应用程序(我更喜欢使用Gradle执行installDist并从构建目录运行) .

  • 发送更多消息(值必须在同一窗口中) . "Test:2","Test:3" . 输出应该有3个计数 . 如果需要,您可以发送"Test:501",它将显示1,就像在另一个窗口中一样 .

  • 终止应用程序(按住命令窗口) .

  • 重启应用程序 .

  • 发送"Test:4" . 它将输出1,表示该时间窗口中只有一条记录,即使应该有4条记录 .

以下是我在运行时看到的内容:

生产环境 者 - >测试:1消费者 - >测试:1 生产环境 者 - >测试:2 生产环境 者 - >测试:3消费者 - >测试:3重新启动 生产环境 者 - >测试:4消费者 - >测试:1

我希望最后的消费者记录是Test:4,因为窗口0-500中有4条记录 .

使用内置时间戳时,如果应用程序重新启动,Kafka将继续聚合它停止的位置 . 另外,如果我使用没有窗口的自定义时间戳,它也可以正常工作 .

Main.class

package test.kafka;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class Main {
    private static final Logger logger = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) {
        Properties streamsConfiguration = new Properties();
        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "kafka-consumer-client");
        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,"test.kafka.CustomKafkaTimestamp");
        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,"var/");

        KStreamBuilder builder = new KStreamBuilder();

        KStream<String,String> testStream = builder.stream("test-topic");
        testStream.foreach((key,value)->logger.info("Got "+key+"="+value));
        testStream.groupByKey().count(TimeWindows.of(500))
                .toStream()
                .map((key, value) -> new KeyValue<>(key.key(),value))
                .to(Serdes.String(),Serdes.Long(),"test-output");

        KafkaStreams streams = new KafkaStreams(builder,streamsConfiguration);
        streams.start();
        Runtime.getRuntime().addShutdownHook(new Thread(()->{streams.close(5, TimeUnit.SECONDS); streams.cleanUp();}));
    }
}

CustomKafkaTimestamp.class

package test.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;

public class CustomKafkaTimestamp implements TimestampExtractor {
    @Override
    public long extract(ConsumerRecord<Object, Object> record, long previousTimestamp) {
        if (record.value() instanceof String) {
            String str = (String) record.value();
            return Long.parseLong(str);
        }
        return 0;
    }
}

build.gradle

group 'local.kafka'
version '1.0-SNAPSHOT'

apply plugin: 'java'
apply plugin: 'application'

mainClassName = 'test.kafka.Main'

sourceCompatibility = 1.8

repositories {
    mavenCentral()
}

dependencies {
    compile 'org.apache.kafka:kafka-clients:0.11.0.1'
    compile 'org.apache.kafka:kafka-streams:0.11.0.1'
    compile group: 'org.apache.logging.log4j', name: 'log4j-slf4j-impl', version: '2.10.0'
    compile group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.10.0'
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Edit/Update

正如所建议的那样,我尝试使用1.0.0版运行代码 . 它仍然无法正常工作,但这一次堆栈跟踪和崩溃 . 在这一点上,值得注意的是我在Windows系统上进行开发 - 看起来它正试图在目录名中创建一个冒号 .

我尝试在Linux vm上运行客户端 . 使用0.11.01,它的行为与我原来的描述相同 . 在Linux上使用1.0会导致预期的行为 - 在重新启动之间保留状态 .

Windows上使用Streams 1.0.0的堆栈跟踪:

10:50:48.188 [] INFO  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] State transition from PARTITIONS_ASSIGNED to RUNNING
10:50:48.189 [] INFO  org.apache.kafka.streams.KafkaStreams - stream-client [kafka-consumer-client]State transition from REBALANCING to RUNNING
10:50:48.225 [] INFO  test.kafka.Main - Got Test=1
10:50:48.230 [] INFO  test.kafka.Main - Got Test=2
10:51:19.753 [] ERROR org.apache.kafka.streams.processor.internals.ProcessorStateManager - task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002:
org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-AGGREGATE-STATE-STORE-0000000002:0 at location var\test\0_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:0
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:100) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:132) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:128) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) [kafka-streams-1.0.0.jar:?]
Caused by: org.rocksdb.RocksDBException: Failed to create dir: C:\Users\Chris Lastname\IdeaProjects\kafka-test\build\install\kafka-test\bin\var\test\0_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:0: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.7.3.jar:?]
        at org.rocksdb.RocksDB.open(RocksDB.java:231) ~[rocksdbjni-5.7.3.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197) ~[kafka-streams-1.0.0.jar:?]
        ... 29 more
10:51:19.757 [] ERROR org.apache.kafka.streams.processor.internals.AssignedTasks - stream-thread [kafka-consumer-client-StreamThread-1] Failed to commit stream task 0_0 due to the following error:
org.apache.kafka.streams.errors.ProcessorStateException: task [0_0] Failed to flush state store KSTREAM-AGGREGATE-STATE-STORE-0000000002
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:248) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AbstractTask.flushState(AbstractTask.java:196) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.flushState(StreamTask.java:324) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:304) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:299) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:289) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:87) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:451) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:380) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:309) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:1018) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:835) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) [kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) [kafka-streams-1.0.0.jar:?]
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error opening store KSTREAM-AGGREGATE-STATE-STORE-0000000002:0 at location var\test\0_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:0
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:204) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:100) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:132) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:128) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) ~[kafka-streams-1.0.0.jar:?]
        ... 14 more
Caused by: org.rocksdb.RocksDBException: Failed to create dir: C:\Users\Chris Lastname\IdeaProjects\kafka-test\build\install\kafka-test\bin\var\test\0_0\KSTREAM-AGGREGATE-STATE-STORE-0000000002\KSTREAM-AGGREGATE-STATE-STORE-0000000002:0: Invalid argument
        at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.7.3.jar:?]
        at org.rocksdb.RocksDB.open(RocksDB.java:231) ~[rocksdbjni-5.7.3.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:197) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:174) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segment.openDB(Segment.java:40) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.Segments.getOrCreateSegment(Segments.java:89) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBSegmentedBytesStore.put(RocksDBSegmentedBytesStore.java:81) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:43) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.RocksDBWindowStore$RocksDBWindowBytesStore.put(RocksDBWindowStore.java:34) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:67) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ChangeLoggingWindowBytesStore.put(ChangeLoggingWindowBytesStore.java:33) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore$1.apply(CachingWindowStore.java:100) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:141) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:99) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.ThreadCache.flush(ThreadCache.java:127) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.CachingWindowStore.flush(CachingWindowStore.java:132) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.state.internals.MeteredWindowStore.flush(MeteredWindowStore.java:128) ~[kafka-streams-1.0.0.jar:?]
        at org.apache.kafka.streams.processor.internals.ProcessorStateManager.flush(ProcessorStateManager.java:245) ~[kafka-streams-1.0.0.jar:?]
        ... 14 more
10:51:19.760 [] INFO  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] State transition from RUNNING to PENDING_SHUTDOWN
10:51:19.761 [] INFO  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] Shutting down
10:51:19.765 [] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=kafka-consumer-client-StreamThread-1-producer] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
10:51:19.770 [] INFO  org.apache.kafka.streams.processor.internals.StreamThread - stream-thread [kafka-consumer-client-StreamThread-1] State transition from PENDING_SHUTDOWN to DEAD
10:51:19.772 [] INFO  org.apache.kafka.streams.KafkaStreams - stream-client [kafka-consumer-client]State transition from RUNNING to ERROR
10:51:19.772 [] WARN  org.apache.kafka.streams.KafkaStreams - stream-client [kafka-consumer-client]All stream threads have died. The instance will be in error state and should be closed.