首页 文章

为什么Kafka KTable缺少参赛作品?

提问于
浏览
3

我有一个使用Kafka Streams的KTable的单实例java应用程序 . 直到最近,当突然有些消息似乎消失时,我可以使用KTable检索所有数据 . 那里应该有~33k个带有唯一键的消息 .

当我想按键检索消息时,我没有得到一些消息 . 我使用ReadOnlyKeyValueStore来检索消息:

final ReadOnlyKeyValueStore<GenericRecord, GenericRecord> store = ((KafkaStreams)streams).store(storeName, QueryableStoreTypes.keyValueStore());
store.get(key);

这些是我设置为KafkaStreams的配置设置 .

final Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_SERVER_CONFIG, serverId);
config.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
config.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class);
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);

Kafka :0.10.2.0-cp1
Confluent :3.2.0

调查给我带来了一些非常令人担忧的见解 . 使用REST代理我手动读取分区,发现一些偏移返回错误 .

要求: /topics/{topic}/partitions/{partition}/messages?offset={offset}

{
    "error_code": 50002,
    "message": "Kafka error: Fetch response contains an error code: 1"
}

没有客户端,java和命令行都不会返回任何错误 . 他们只是跳过错误的丢失消息,导致KTables中缺少数据 . 一切都很好,没有通知似乎某些消息已经腐败 .

我有两个代理,所有主题的复制因子都是2,并且完全复制 . 两个经纪人分别返回相同的 . 重启经纪人并没有什么区别 .

  • What could possibly be the cause?

  • How to detect this case in a client?

1 回答

  • 1

    通过default Kafka Broker配置键 cleanup.policy 设置为 delete . 将其设置为 compact 以保留每个密钥的最新消息 . See compaction .

    删除旧消息不会更改最小偏移量,因此尝试检索其下方的消息会导致错误 . 错误非常模糊 . Kafka Streams客户端将开始从最小偏移量读取消息,因此没有错误 . 唯一可见的效果是缺少KTables中的数据 .

    由于caches应用程序正在运行,即使从Kafka本身删除消息后,所有数据仍可用 . 他们将在清理后消失 .

相关问题