首页 文章

加载Avro时,Kafka Connect S3 sink会抛出IllegalArgumentException

提问于
浏览
0

我正在使用qubole's S3 sink以Parquet格式将Avro数据加载到S3中 .

在我的Java应用程序中,我创建一个 生产环境 者

Properties props = new Properties();
props.put("bootstrap.servers", KafkaHelper.getServers());
props.put("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
return new KafkaProducer<byte[], byte[]>(props);

然后将 GenericRecord 转换为 byte[] 格式:

GenericRecord avroRecord = new GenericData.Record(avroSchema);
Injection<GenericRecord, byte[]> recordInjection = GenericAvroCodecs.toBinary(avroSchema);

for (Map.Entry<String, ?> entry : map.entrySet()) {
    String key = entry.getKey();
    Object value = entry.getValue();
    avroRecord.put(key, value);
}

ProducerRecord<byte[], byte[]> record = new ProducerRecord<>(topic, recordInjection.apply(avroRecord));
producer.send(record);

我在Kafka Connect属性中使用以下值:

key.converter=com.qubole.streamx.ByteArrayConverter
value.converter=com.qubole.streamx.ByteArrayConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

以及我的文件接收器属性中的以下配置选项:

connector.class=com.qubole.streamx.s3.S3SinkConnector
format.class=io.confluent.connect.hdfs.parquet.ParquetFormat

当我运行连接器时,我收到以下错误消息:'java.lang.IllegalArgumentException:Avro架构必须是记录' .

我是Kafka Connect的新手,我知道可以设置一个Schema Registry服务器 - 但我不明白接收器是否需要注册表将Avro数据转换为Parquet或者这是某种我的格式或配置问题 . “记录”在此错误的上下文中引用了哪种数据格式?任何方向或帮助将不胜感激 .

1 回答

  • 4

    ByteArrayConverter 不会进行任何数据转换:它不是实际进行任何序列化/反序列化,而是假设连接器知道如何处理原始 byte[] 数据 . 但是, ParquetFormat (实际上大多数格式)无法处理原始数据 . 相反,他们希望数据被反序列化并结构化为记录(您可以将其视为C结构,POJO等) .

    请注意,qubole streamx README指出 ByteArrayConverter 在您可以安全地直接复制数据的情况下非常有用 . 例如,如果您将数据作为JSON或CSV . 这些don 't need deserialization because the bytes for each Kafka record'的值可以简单地复制到输出文件中 . 在这些情况下,这是一个很好的优化,但通常不适用于所有输出文件格式 .

相关问题