首页 文章

Spring Cloud Stream Kafka>使用Confluent REST Proxy消费Avro消息

提问于
浏览
3

我有以下场景:

我的应用程序如下所示:

@SpringBootApplication
@EnableBinding(Sink.class)
public class MyApplication {
  private static Logger log = LoggerFactory.getLogger(MyApplication.class);

  public static void main(String[] args) {
    SpringApplication.run(MyApplication.class, args);
  }

  @StreamListener(Sink.INPUT)
  public void myMessageSink(MyMessage message) {
    log.info("Received new message: {}", message);
  }
}

而MyMessage是Avro从Avro架构创建的类 .

我的application.properties看起来像这样:

spring.cloud.stream.bindings.input.destination=myTopic
spring.cloud.stream.bindings.input.group=${spring.application.name}
spring.cloud.stream.bindings.input.contentType=application/*+avro

我的问题是,每次收到新消息时,都会抛出以下异常:

org.springframework.messaging.MessagingException: Exception thrown while invoking MyApplication#myMessageSink[1 args]; nested exception is org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:316) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) ~[spring-integration-core-4.3.2.RELEASE.jar:4.3.2.RELEASE]
    ...
Caused by: org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -27
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:430) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMapKey(GenericDatumReader.java:335) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:321) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152) ~[avro-1.8.1.jar:1.8.1]
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144) ~[avro-1.8.1.jar:1.8.1]
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertFromInternal(AbstractAvroMessageConverter.java:91) ~[spring-cloud-stream-schema-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    at org.springframework.messaging.converter.AbstractMessageConverter.fromMessage(AbstractMessageConverter.java:175) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.converter.CompositeMessageConverter.fromMessage(CompositeMessageConverter.java:67) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.annotation.support.PayloadArgumentResolver.resolveArgument(PayloadArgumentResolver.java:117) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:112) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:138) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:107) ~[spring-messaging-4.3.3.RELEASE.jar:4.3.3.RELEASE]
    at org.springframework.cloud.stream.binding.StreamListenerAnnotationBeanPostProcessor$StreamListenerMessageHandler.handleRequestMessage(StreamListenerAnnotationBeanPostProcessor.java:307) ~[spring-cloud-stream-1.1.0.RELEASE.jar:1.1.0.RELEASE]
    ... 35 common frames omitted

据我所知,问题在于Confluent堆栈包含消息模式的ID作为消息有效负载的一部分,并且客户端应该在模式ID之后开始读取实际的Avro消息 . 我似乎需要配置Kafka绑定以使用Confluent的KafkaAvroDeserializer,但我无法弄清楚如何实现这一点 .

(我可以使用Confluent的avro控制台消费者完全正确地检索消息,因此它似乎不是Avro编码的问题)

我也尝试使用@EnableSchemaRegistry注释并配置ConfluentSchemaRegistryClient bean,但它在我看来这样只能控制存储/检索模式的位置,而不是实际的反序列化 .

这甚至应该以某种方式起作用吗?

2 回答

  • 0

    per-binding 属性 spring.cloud.stream.kafka.bindings.input.consumer.configuration.value.deserializer 设置为 Confluent's KafkaAvroDeserializer class name 时是否有效?

  • 0

    回答我自己的问题 . 我现在所做的是实现一个MessageConverter,它只是删除任何消息的前4个字节,然后再将它们传递给Avro解码器 . 代码主要来自spring-cloud-stream的AbstractAvroMessageConverter:

    public class ConfluentAvroSchemaMessageConverter extends AvroSchemaMessageConverter {
    
    public ConfluentAvroSchemaMessageConverter() {
        super(new MimeType("application", "avro+confluent"));
    }
    
    @Override
    protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
        Object result = null;
        try {
            byte[] payload = (byte[]) message.getPayload();
    
            // byte array to contain the message without the confluent header (first 4 bytes)
            byte[] payloadWithoutConfluentHeader = new byte[payload.length - 4];
            ByteBuffer buf = ByteBuffer.wrap(payload);
            MimeType mimeType = getContentTypeResolver().resolve(message.getHeaders());
            if (mimeType == null) {
                if (conversionHint instanceof MimeType) {
                    mimeType = (MimeType) conversionHint;
                }
                else {
                    return null;
                }
            }
    
            // read first 4 bytes and copy the rest to the new byte array
            // see https://groups.google.com/forum/#!topic/confluent-platform/rI1WNPp8DJU
            buf.getInt();
            buf.get(payloadWithoutConfluentHeader);
    
            Schema writerSchema = resolveWriterSchemaForDeserialization(mimeType);
            Schema readerSchema = resolveReaderSchemaForDeserialization(targetClass);
            DatumReader<Object> reader = getDatumReader((Class<Object>) targetClass, readerSchema, writerSchema);
            Decoder decoder = DecoderFactory.get().binaryDecoder(payloadWithoutConfluentHeader, null);
            result = reader.read(null, decoder);
        }
        catch (IOException e) {
                throw new MessageConversionException(message, "Failed to read payload", e);
        }
        return result;
    
    }
    

    然后,我通过application.properties将传入的Kafka主题的内容类型设置为application / avro confluent .

    这至少让我检索使用Confluent堆栈编码的消息,但当然它不会以任何方式与架构注册表交互 .

相关问题