我想用spring boot设置一个spring-cloud-stream-kafka制作人 .
生产环境 者正在工作,我可以使用来自kafka代理的消息,但消息还包含一些 Headers 信息,如下所示:
contentType "text/plain"originalContentType "application/json;charset=UTF-8"{"message":"hello"}
我的POJO包含一个字段(字符串消息)所以我期望只有JSON字符串将被发送到kafka .
我的RestController中的方法test()触发 生产环境 者:
@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {
private MessageChannel consumer;
public KafkaStreamProducerApplication(ProducerChannels channels) {
this.consumer = channels.consumer();
}
@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
this.consumer.send(msg);
}
interface ProducerChannels {
@Output
MessageChannel consumer();
}
我的application.properties
spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json
如果您能就此主题推荐任何文档或示例,我将不胜感激 . github上的示例通常很薄,它们使用了大量的自动配置而没有解释 . 我使用的例子是RabbitMQ .
2 回答
当消费者应用程序对消息进行反序列化时,Spring Cloud Stream会使用
contentType
和originalContentType
标头,并根据内容类型集执行消息转换 .只有在配置绑定的内容类型时才会显式设置
contentType
标头,就像在此处spring.cloud.stream.bindings.consumer.content-type=application/json
一样 . 设置contentType
标头后,Spring Cloud Stream会在序列化/反序列化过程中使用originalContentType
标志保留此标头,从而生成/消费来自代理的消息(通过 Binders ) .在你的情况下,我猜你可能根本不需要设置
contentType
.对于除spring-cloud-stream-samples github repo中的样本之外的示例,您还可以参考out of the box app starters,其涵盖可能针对任何支持的 Binders (包括Kafka)运行的各种应用程序 .
如果您想避免嵌入 Headers (因此您可以在某些非Spring Cloud Stream应用程序中接收消息),请将 生产环境 者的
headerMode
设置为raw
.见Producer Properties .