首页 文章

带解码器问题的 Kafka Avro Consumer

提问于
浏览
8

当我尝试使用我各自的模式对数据运行Avaf 的卡夫卡消费者时,它返回错误“AvroRuntimeException:格式错误的数据。长度为负:-40”。我看到其他人有类似的问题将字节数组转换为 jsonAvro 写和读Kafka Avro Binary *编码员。我也引用了这个消费者群体示例,这些都有帮助,但到目前为止这个错误没有任何帮助..它可以工作直到这部分代码(第 73 行)

解码器解码器= DecoderFactory.get().binaryDecoder(byteArrayInputStream,null);

我已经尝试了其他解码器并打印出 byteArrayInputStream 变量的内容,它看起来我相信你会期望序列化的 avro 数据看起来(在消息中我可以看到模式和一些数据以及一些格式错误的数据)我打印出来了使用.available()方法可用的字节,返回 594.我无法理解为什么会发生此错误。 Apache Nifi 用于生成具有来自 hdfs 的相同模式的 Kafka 流。我将不胜感激任何帮助。

1 回答

  • 17

    也许问题是 Nifi 如何编写(编码)Avro 数据与消费者应用程序读取(解码)数据的方式不匹配。

    简而言之,Avro 的 API 提供了两种不同的序列化方法:

    • 用于创建正确的 Avro 文件:对数据记录进行编码,但也将 Avro 架构嵌入到一种前导码中(通过org.apache.avro.file.{DataFileWriter/DataFileReader})。将模式嵌入到 Avro 文件中非常有意义,因为(a)Avro 文件的“有效负载”通常比嵌入式 Avro 模式大一些,并且(b)然后您可以根据自己的内容复制或移动这些文件并且仍然可以确保您可以再次阅读它们,而无需咨询某人或某事。

    • 要仅编码数据记录,i.e。不嵌入模式(通过org.apache.avro.io.{BinaryEncoder/BinaryDecoder};注意包名称的区别:io这里与file上面)。例如,当正在写入 Kafka 主题的 Avro-encoding 消息时,这种方法通常会受到青睐,因为与上面的变体 1 相比,您不会在每条消息中产生 re-embedding Avro 模式的开销,假设您(非常合理)策略是,对于相同的 Kafka 主题,消息是 formatted/encoded 具有相同的 Avro 架构。这是一个显着的优点,因为在流数据上下文中,data-in-motion 数据记录通常比上述 data-at-rest Avro 文件小得多(通常在 100 字节到几百 KB 之间)(通常为数百或数千 MB);所以 Avro 架构的大小相对较大,因此在将 2000 个数据记录写入 Kafka 时,您不希望将其嵌入 2000x。缺点是您必须“以某种方式”跟踪 Avro 架构如何映射到 Kafka 主题 - 或者更准确地说,您必须以某种方式跟踪编码消息的 Avro 架构,而不必直接嵌入架构的路径。好消息是,Kafka 生态系统中可用的工具(Avro 架构注册表)透明地执行此操作。因此,与变体 1 相比,变体 2 以便利性为代价获得了效率。

    结果是,编码的 Avro 数据的“有线格式”看起来会有所不同,具体取决于您使用上面的(1)还是(2)。

    我对 Apache Nifi 不太熟悉,但是快速查看源代码(e.g. ConvertAvroToJSON.java)向我建议它使用变量 1,i.e。它将 Avro 架构与 Avro 记录一起嵌入。但是,您的使用者代码使用DecoderFactory.get().binaryDecoder(),因此使用变体 2(未嵌入任何模式)。

    也许这解释了你遇到的错误?

相关问题