问题背景

目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子2) . 已安装的Kafka版本为1.1.1 .

最终用户向我们报告数据消失的问题 . 他们报告突然他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,第二天早上的 table 是空的) . 我们检查了这个特定用户的更改日志主题,看起来很奇怪,看起来几天不活动(给定键值对可能会保持数天不变)更改日志主题中的聚合值丢失了 .

代码

KTable装配线:(消息按事件中的'用户名'分组)


@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
    return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
                         .groupBy((key, event) -> event.getUsername(),
                                 Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
                         .aggregate(
                                 UserItems::none,
                                 (key, event, userItems) ->
                                         userItems.after(event),
                                 Materialized
                                         .<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
                                         .withKeySerde(serdes.forA(UsernameVO.class))
                                         .withValueSerde(serdes.forA(UserItems.class)));
}

聚合对象(KTable值):


public class UserItems {

private final Map<String, Item> items;

public static UserItems none() {
    return new UserItems();
}

private UserItems() {
    this(emptyMap());
}

@JsonCreator
private UserItems(Map<String, Item> userItems) {
    this.userItems = userItems;
}

@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
    return Collections.unmodifiableMap(items);
}

...
public UserItems after(ItemAddedEvent itemEvent) {
    Item item = Item.from(itemEvent);

    Map<String, Item> newItems = new HashMap<>(items);
    newItems.put(itemEvent.getItemName(), item);
    return new UserItems(newItems);
}

Kafka 主题

application-user-UserItems


这个源主题没有问题 . 它将保留设置为最大值,所有消息始终存在 .


application-user-UserItems-store-changelog (压缩 . 具有默认配置 - 没有更改保留,也没有任何更改)


这是奇怪的部分 . 我们可以在更改日志中观察到某些用户的值丢失了:

Offset | Partition |   Key   |  Value  
...........................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}  
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}  
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}  
...

我们可以在上面看到,首先正确地聚集了消息:处理了Item1,然后将Item2应用于聚合 . 但是经过一段时间 - 可能是几天 - 正在处理另一个事件 - 底层“User1”键下的值似乎丢失了,只有Item3存在 .

在应用程序中,用户无法删除所有项目并在一个操作中添加另一个项目 - 用户只能逐个添加或删除项目 . 因此,如果他删除ItemName1和ItemName2然后添加ItemName3,我们_1151556_就像更改日志中的那样:

Offset | Partition |   Key   |  Value   
..............................................  
...  
320         0        "User1" : {"ItemName1":{"param":"foo"}}   
325         0        "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}   
1054        0        "User1" : {"ItemName2":{"param":"bar"}}   
1055        0        "User1" : {}   
1056        0        "User1" : {"ItemName3":{"param":"zyx"}}

结论

起初我们认为它与changelog主题保留有关(但我们检查了它并且它只启用了压缩) .

application-user-UserItems-store-changelog  PartitionCount:3    ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600   
    Topic: application-user-UserItems-store-changelog   Partition: 0    Leader: 0   Replicas: 0 Isr: 0   
    Topic: application-user-UserItems-store-changelog   Partition: 1    Leader: 2   Replicas: 2 Isr: 2   
    Topic: application-user-UserItems-store-changelog   Partition: 2    Leader: 1   Replicas: 1 Isr:

任何想法或提示将不胜感激 . 干杯