在将我们的某个服务升级到 spring-cloud-stream
2.0.0.RC3时,我们在尝试使用由使用旧版本的 spring-cloud-stream
- Ditmars.RELEASE的服务生成的消息时遇到异常:
ERROR 31241 --- [container-4-C-1] osintegration.handler.LoggingHandler:org.springframework.messaging.converter.MessageConversionException:无法从[[B]转换为[com.watercorp.messaging.types.incoming .UsersDeletedMessage] for GenericMessage [payload = byte [371],headers = {kafka_offset = 1,kafka_consumer =org.apache.kafka.clients.consumer.KafkaConsumer @ 62029d0d,kafka_timestampType = CREATE_TIME,message_id = 1645508761,id = f4e947de-22e6- b629-229b-4fa961c73f2d,type = USERS_DELETED,kafka_receivedPartitionId = 4,contentType = text / plain,kafka_receivedTopic = user,kafka_receivedTimestamp = 1521641760698,timestamp = 1521641772477}],failedMessage = GenericMessage [payload = byte [371],headers = {kafka_offset = 1,kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@62029d0d,kafka_timestampType = CREATE_TIME,message_id = 1645508761,id = f4e947de-22e6-b629-229b-4fa961c73f2d,type = USERS_DELETED,kafka_receivedPartitionId = 4,contentType = text / plain,kafka_receivedTopic = user,kafka_receivedTimestamp = 1521641760698,时间戳= 1521641772477}]在org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:144)在org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:116 )在在org.springframework.cloud org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:137)在org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:109) . org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:)中的org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)中的stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) 164)在org.springframework.cloud .stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87)在org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)在org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java :157)org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)org.springframework.integration.dispatcher . 位于org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)的UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463)at org .springframework.integration.channel.AbstractMessageChan nel.send(AbstractMessageChannel.java:407)org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181)org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160)at org .springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108)org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport) .java:203)org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access $ 300(KafkaMessageDrivenChannelAdapter.java:70)org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter $ IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387)在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter $ IntegrationRecordMessageListener .onMessage(KafkaMessageDrivenChannelAdapter.java:364)org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001)at atorg.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981)在org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932)在org.springframework.kafka.listener . kafkaMessageListenerContainer $ ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801)atg.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.run(KafkaMessageListenerContainer.java:689)at java.util.concurrent.Executors $ RunnableAdapter.call(Executors.java: 511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.lang.Thread.run(Thread.java:745)
看起来原因是与消息一起发送的 contentType
标头是 text/plain
,尽管它应该是 application/json
.
生产环境 者配置:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
zkNodes: zookeeper
defaultZkPort: 2181
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-output:
producer:
sync: true
configuration:
retries: 10000
default:
binder: kafka
contentType: application/json
group: user-service
consumer:
maxAttempts: 1
producer:
partitionKeyExtractorClass: com.watercorp.user_service.messaging.PartitionKeyExtractor
bindings:
user-output:
destination: user
producer:
partitionCount: 5
消费者配置:
spring:
cloud:
stream:
kafka:
binder:
brokers: kafka
defaultBrokerPort: 9092
minPartitionCount: 2
replicationFactor: 1
autoCreateTopics: true
autoAddPartitions: true
headers: type,message_id
requiredAcks: 1
configuration:
"[security.protocol]": PLAINTEXT #TODO: This is a workaround. Should be security.protocol
bindings:
user-input:
consumer:
autoRebalanceEnabled: true
autoCommitOnError: true
enableDlq: true
default:
binder: kafka
contentType: application/json
group: enrollment-service
consumer:
maxAttempts: 1
headerMode: embeddedHeaders
producer:
partitionKeyExtractorClass: com.watercorp.messaging.PartitionKeyExtractor
headerMode: embeddedHeaders
bindings:
user-input:
destination: user
consumer:
concurrency: 5
partitioned: true
消费者@StreamListener:
@StreamListener(target = UserInput.INPUT, condition = "headers['type']=='" + USERS_DELETED + "'")
public void handleUsersDeletedMessage(@Valid UsersDeletedMessage usersDeletedMessage, @Header(value = "kafka_receivedPartitionId",
required = false) String partitionId, @Header(value = KAFKA_TOPIC_HEADER_NAME, required = false) String topic, @Header(MESSAGE_ID_HEADER_NAME) String messageId) throws Throwable {
logger.info(String.format("Received users deleted message message, message id: %s topic: %s partition: %s", messageId, topic, partitionId));
handleMessageWithRetry(_usersDeletedMessageHandler, usersDeletedMessage, messageId, topic);
}
1 回答
这是RC3中的一个错误; recently fixed on master;它将在本月底的GA版本中发布 . 在此期间,您可以试试2.0.0.BUILD-SNAPSHOT吗?
我能够重现问题并使用快照为我修复它...
EDIT
为了完整性:
Ditmars制作人
和
Elmhurst消费者:
和
结果:
header-mode
是必需的,因为2.0中的默认值是native
,现在Kafka本身支持头文件 .