我有一个使用Kafka的基于Spring Cloud Stream的微服务 .
我创建了一个包含4个分区的kafka主题 .
我在yml中配置了以下内容:
spring:
cloud:
stream:
bindings:
SYNC_TABLE:
content-type: application/json
partitionKeyExpression: payload.partitionKey
partitionCount: 4
destination: ${envTopicPrefix}.LEGACY_TABLE
在我的代码中,我的消息类包含(在其超类中)partitionKey变量:
@Data
@EqualsAndHashCode(callSuper=true)
@ToString(callSuper=true)
public class TransactionResponse extends GeneralOutputMessage{
}
@Data
@ToString
public class GeneralOutputMessage {
private String operationType;
private List<String> affectedFields;
private Object data;
private String eventId;
private String eventName;
private String partitionKey;
}
我发送TransactionsResponse对象作为消息:
final TransactionResponse transactionResponse = handler.handleEvent(event);
if (transactionResponse != null) {
outputChannels.tableSync().send(MessageBuilder.withPayload(transactionResponse).build());
log.info("Message Sent: {}", transactionResponse);
}
我的期望是Spring Cloud 流将获取密钥payload.partitionKey,计算其hashCode()%4,并将事件发送到该分区 .
但是,逻辑是完全随机的 . 这里有一些例子:
Math.abs(“111615631”.hashCode()%4)= 1.但是,消息将发送到3号分区 .
Math.abs(“110019882”.hashCode()%4)= 2.但是,消息将发送到分区号0 .
Math.abs("943152574" .hashCode()%4)= 0.此消息确实被发送到分区号0 .
Math.abs("943198862" .hashCode()%4)= 0.但是,此消息将发送到分区号2 .
我正在使用Dalston.SR1发布列车 .
我在这里想念的是什么?
谢谢 .
Update:
只是尝试使用相同的partitionKey发送相同的事件(但略有不同的消息体) . 即使分区键相同,该消息也会转到两个不同的分区 . 看起来Spring Cloud Stream完全忽略了partitionKeyExpression .
1 回答
这是我的错,我忘了在yml中添加producer:section: