首页 文章

如何配置Spring Cloud 流(kafka)以使用protobuf作为序列化

提问于
浏览
1

我使用Spring Cloud 流(kafka)在 生产环境 者和消费者微服务之间交换消息 . 它与本机java序列化交换数据 . 根据Spring Cloud 文档,它支持JSON,AVRO序列化 .

有没有人在spring Cloud 流中尝试过protobuf序列化(消息转换器)

----------------后来添加

我写了这个MessageConverter

public class ProtobufMessageConverter<T extends AbstractMessage > extends AbstractMessageConverter
{
    private  Parser<T> parser;
    public ProtobufMessageConverter(Parser<T> parser)

    {
        super(new MimeType("application", "protobuf"));
        this.parser = parser;
    }

    @Override
    protected boolean supports(Class<?> clazz)
    {
        if (clazz != null)
        {
            return EquipmentProto.Equipment.class.isAssignableFrom(clazz);
        }
        return true;
    }

    @Override
    public Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint)
    {
        if (!(message.getPayload() instanceof byte[]))
        {
            return null;
        }
        try
        {
//            return EquipmentProto.Equipment.parseFrom((byte[]) message.getPayload());
            return parser.parseFrom((byte[]) message.getPayload());
        }
        catch (Exception e)
        {
            this.logger.error(e.getMessage(), e);
        }
        return null;
    }

    @Override
    protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint)
    {
        return ((AbstractMessage) payload).toByteArray();
    }

}

1 回答

  • 0

    这真的不是尝试而是仅仅是这样做的问题,因为转换器是spring-cloud-stream中的一种自然扩展机制(从spring-integration继承),专门用于解决这些问题 . 所以是的,您可以添加自己的自定义转换器 .

    另外,请记住,对于Kafka,还有一个本地serde的概念,所以你需要确保两者不会产生一些冲突 .

相关问题