首页 文章

Spring Cloud Stream Kafka Producer消息

提问于
浏览
0

我想用spring boot设置一个spring-cloud-stream-kafka制作人 .

生产环境 者正在工作,我可以使用来自kafka代理的消息,但消息还包含一些 Headers 信息,如下所示:

contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"message":"hello"}

我的POJO包含一个字段(字符串消息)所以我期望只有JSON字符串将被发送到kafka .

我的RestController中的方法test()触发 生产环境 者:

@EnableBinding(ProducerChannels.class)
@SpringBootApplication
@RestController
public class KafkaStreamProducerApplication {

private MessageChannel consumer;

public KafkaStreamProducerApplication(ProducerChannels channels) {
    this.consumer = channels.consumer();
}

@PostMapping("/test/{message}")
public void test(@PathVariable String message) {
    Message<MyMessage> msg = MessageBuilder.withPayload(new MyMessage(message)).build();
    this.consumer.send(msg);
}

interface ProducerChannels {

    @Output
    MessageChannel consumer();
}

我的application.properties

spring.cloud.stream.bindings.consumer.destination=consumer
spring.cloud.stream.bindings.consumer.content-type=application/json

如果您能就此主题推荐任何文档或示例,我将不胜感激 . github上的示例通常很薄,它们使用了大量的自动配置而没有解释 . 我使用的例子是RabbitMQ .

2 回答

  • 0

    当消费者应用程序对消息进行反序列化时,Spring Cloud Stream会使用 contentTypeoriginalContentType 标头,并根据内容类型集执行消息转换 .

    只有在配置绑定的内容类型时才会显式设置 contentType 标头,就像在此处 spring.cloud.stream.bindings.consumer.content-type=application/json 一样 . 设置 contentType 标头后,Spring Cloud Stream会在序列化/反序列化过程中使用 originalContentType 标志保留此标头,从而生成/消费来自代理的消息(通过 Binders ) .

    在你的情况下,我猜你可能根本不需要设置 contentType .

    对于除spring-cloud-stream-samples github repo中的样本之外的示例,您还可以参考out of the box app starters,其涵盖可能针对任何支持的 Binders (包括Kafka)运行的各种应用程序 .

  • 1

    如果您想避免嵌入 Headers (因此您可以在某些非Spring Cloud Stream应用程序中接收消息),请将 生产环境 者的 headerMode 设置为 raw .

    Producer Properties .

    headerMode设置为raw时,禁用输出中的标头嵌入 . 仅对本身不支持邮件头并且需要标头嵌入的邮件中间件有效 . 在为非Spring Cloud Stream应用程序生成数据时很有用 . 默认值:embeddedHeaders .

相关问题