问题背景
目前我们正在使用:Kafka Streams API(版本1.1.0)来处理来自Kafka集群的消息(3个代理,每个主题3个分区,复制因子2) . 已安装的Kafka版本为1.1.1 .
最终用户向我们报告数据消失的问题 . 他们报告突然他们看不到任何数据(例如,昨天他们可以在UI中看到n条记录,第二天早上的 table 是空的) . 我们检查了这个特定用户的更改日志主题,看起来很奇怪,看起来几天不活动(给定键值对可能会保持数天不变)更改日志主题中的聚合值丢失了 .
代码
KTable装配线:(消息按事件中的'用户名'分组)
@Bean
public KTable<UsernameVO, UserItems> itemsOfTheUser() {
return streamsBuilder.stream("application-user-UserItems", Consumed.with(Serdes.String(), serdes.forA(UserItems.class)))
.groupBy((key, event) -> event.getUsername(),
Serialized.with(serdes.forA(UsernameVO.class), serdes.forA(UserItems.class)))
.aggregate(
UserItems::none,
(key, event, userItems) ->
userItems.after(event),
Materialized
.<UsernameVO, UserItems> as(persistentKeyValueStore("application-user-UserItems"))
.withKeySerde(serdes.forA(UsernameVO.class))
.withValueSerde(serdes.forA(UserItems.class)));
}
聚合对象(KTable值):
public class UserItems {
private final Map<String, Item> items;
public static UserItems none() {
return new UserItems();
}
private UserItems() {
this(emptyMap());
}
@JsonCreator
private UserItems(Map<String, Item> userItems) {
this.userItems = userItems;
}
@JsonValue
@SuppressWarnings("unused")
Map<String, Item> getUserItems() {
return Collections.unmodifiableMap(items);
}
...
public UserItems after(ItemAddedEvent itemEvent) {
Item item = Item.from(itemEvent);
Map<String, Item> newItems = new HashMap<>(items);
newItems.put(itemEvent.getItemName(), item);
return new UserItems(newItems);
}
Kafka 主题
application-user-UserItems
这个源主题没有问题 . 它将保留设置为最大值,所有消息始终存在 .
application-user-UserItems-store-changelog (压缩 . 具有默认配置 - 没有更改保留,也没有任何更改)
这是奇怪的部分 . 我们可以在更改日志中观察到某些用户的值丢失了:
Offset | Partition | Key | Value
...........................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
...
我们可以在上面看到,首先正确地聚集了消息:处理了Item1,然后将Item2应用于聚合 . 但是经过一段时间 - 可能是几天 - 正在处理另一个事件 - 底层“User1”键下的值似乎丢失了,只有Item3存在 .
在应用程序中,用户无法删除所有项目并在一个操作中添加另一个项目 - 用户只能逐个添加或删除项目 . 因此,如果他删除ItemName1和ItemName2然后添加ItemName3,我们_1151556_就像更改日志中的那样:
Offset | Partition | Key | Value
..............................................
...
320 0 "User1" : {"ItemName1":{"param":"foo"}}
325 0 "User1" : {"ItemName1":{"param":"foo"},"ItemName2":{"param":"bar"}}
1054 0 "User1" : {"ItemName2":{"param":"bar"}}
1055 0 "User1" : {}
1056 0 "User1" : {"ItemName3":{"param":"zyx"}}
结论
起初我们认为它与changelog主题保留有关(但我们检查了它并且它只启用了压缩) .
application-user-UserItems-store-changelog PartitionCount:3 ReplicationFactor:1 Configs:cleanup.policy=compact,max.message.bytes=104857600
Topic: application-user-UserItems-store-changelog Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: application-user-UserItems-store-changelog Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: application-user-UserItems-store-changelog Partition: 2 Leader: 1 Replicas: 1 Isr:
任何想法或提示将不胜感激 . 干杯