首页 文章

Kafka制作人JSON序列化

提问于
浏览
1

我正在尝试使用Spring Cloud Stream与Kafka集成 . 正在编写的消息是Java POJO,虽然它按预期工作(消息正在写入主题,我可以使用消费者应用程序读取),但是在消息的开头添加了一些未知的字符尝试集成Kafka Connect以接收来自主题的消息时出现问题 .

使用默认设置,这是推送到Kafka的消息:

 contentType   "text/plain"originalContentType    "application/json;charset=UTF-8"{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471,"version":null}}

如果我在Java应用程序中配置Kafka生成器,则消息将写入主题而不包含前导字符/ Headers :

@Configuration
public class KafkaProducerConfig {

    @Bean
    public ProducerFactory<String, Object> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
            ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
            "localhost:9092");
        configProps.put(
            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class);
        configProps.put(
            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            JsonSerializer.class);

        return new DefaultKafkaProducerFactory<String, Object>(configProps);
    }

    @Bean
    public KafkaTemplate<String, Object> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

关于 Kafka 的消息:

{"payload":{"username":"john"},"metadata":{"eventName":"Login","sessionId":"089acf50-00bd-47c9-8e49-dc800c1daf50","username":"john","hasSent":null,"createDate":1511186145471}

由于我希望能够在 application.yml 属性文件中执行此操作,而不是通过代码执行此操作 . 但是,当更新yml以指定序列化程序时,它's not working as I would expect i.e. it's不会生成与Java中配置的生成器相同的消息(上面):

spring:
  profiles: local
  cloud:
    stream:
      bindings:
        session:
          destination: session
          contentType: application/json
      kafka:
        binder:
          brokers: localhost
          zkNodes: localhost
          defaultZkPort: 2181
          defaultBrokerPort: 9092
        bindings:
          session:
            producer:
              configuration:
                value:
                  serializer: org.springframework.kafka.support.serializer.JsonSerializer
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer

关于 Kafka 的消息:

"/wILY29udGVudFR5cGUAAAAMInRleHQvcGxhaW4iE29yaWdpbmFsQ29udGVudFR5cGUAAAAgImFwcGxpY2F0aW9uL2pzb247Y2hhcnNldD1VVEYtOCJ7InBheWxvYWQiOnsidXNlcm5hbWUiOiJqb2huIn0sIm1ldGFkYXRhIjp7ImV2ZW50TmFtZSI6IkxvZ2luIiwic2Vzc2lvbklkIjoiNGI3YTBiZGEtOWQwZS00Nzg5LTg3NTQtMTQyNDUwYjczMThlIiwidXNlcm5hbWUiOiJqb2huIiwiaGFzU2VudCI6bnVsbCwiY3JlYXRlRGF0ZSI6MTUxMTE4NjI2NDk4OSwidmVyc2lvbiI6bnVsbH19"

是否可以通过应用程序yml单独配置它?是否有其他缺少的设置?

2 回答

  • 1

    请参阅 生产环境 者属性( ....session.producer.useNativeEncoding )中的 headerModeuseNativeEncoding .

    headerMode设置为raw时,禁用输出中的标头嵌入 . 仅对本身不支持邮件头并且需要标头嵌入的邮件中间件有效 . 在为非Spring Cloud Stream应用程序生成数据时很有用 . 默认值:embeddedHeaders . useNativeEncoding设置为true时,出站消息由客户端库直接序列化,必须相应地配置(例如,设置适当的Kafka 生产环境 者值序列化程序) . 使用此配置时,出站邮件编组不基于绑定的contentType . 当使用本机编码时,消费者有责任使用适当的解码器(例如:Kafka消费者 Value 反序列化器)来反序列化入站消息 . 此外,使用本机编码/解码时,将忽略headerMode属性,并且不会将标头嵌入到消息中 . 默认值:false .

  • 1

    感谢@Gary获得上述答案!

    为了完整起见,现在正在为我工作的配置如下 .

    spring:
      profiles: local
      cloud:
        stream:
          bindings:
            session:
              producer:
                useNativeEncoding: true
              destination: session
              contentType: application/json
          kafka:
            binder:
              brokers: localhost
              zkNodes: localhost
              defaultZkPort: 2181
              defaultBrokerPort: 9092
            bindings:
              session:
                producer:
                  configuration:
                    value:
                      serializer: org.springframework.kafka.support.serializer.JsonSerializer
                    key:
                      serializer: org.apache.kafka.common.serialization.StringSerializer
    

相关问题