首页 文章

Kafka 与avro记录

提问于
浏览
1

我有以下内容:来源 - kafka主题(反式) Channels - 内存接收器 - Hdfs(avro_event)

kafka topic trans中的数据是使用c#producer编写的,有数千条avro记录 . 当我运行我的水槽消费者时,它开始将数据下沉到hdfs . 问题是数据的格式是:架构数据架构数据

代替:

架构数据数据

我猜这是因为flume期待带有 的记录类型,而来自kafka的数据将只是我知道有一种方法可以将avro数据包装成一个主题avroFlumeEvent但似乎它不再是真正的avro记录,也许火花消费者或风暴将更喜欢真正的avro数据 . 有没有办法处理这个主题,所以每次水槽将数据滚动到hdfs时,数据都是在没有多个模式的情况下编写的?

2 回答

  • 0

    我们实际上最终得到了这个 . 我们在C# 生产环境 者中使用microsoft .NET avro库而不是apache avro库 . 这意味着avro记录已正确序列化 . 我还需要更改flume接收器以使用“org.apache.flume.sink.hdfs.AvroEventSerializer $ Builder”作为接收器序列化器而不是“avro_event” . 我还需要包含一个连接到kafka源的水槽拦截器,它将变量“flume.avro.schema.url”推入水槽标头,稍后由hdfs sink序列化器使用 .

    我看了一下camus但是对于我们试图实现的东西似乎有点过分了,这是一个连接到kafka主题的基本水槽通道,它将avro数据汇入hdfs .

    我刚从我的java应用程序中删除了拦截器位,该应用程序构建了水槽配置,希望它可以帮助遇到此问题的其他人:

    _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId +".interceptors",_interceptorId);           
                    _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".type","static");
                    _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".key","flume.avro.schema.url");
                    _flumeFileConfigProperties.put(_agentId+".sources." + _sourceId + ".interceptors." + _interceptorId + ".value",_avroProdSchemaLocation +_databaseName + "/" + _topic + "/record/" + _schemaVersion + "/" + _topicName + ".avsc");
    
  • -1

    一旦你将数据放在kafka上,你有没有考虑过使用LinkedIn的Camus?它将运行mapreduce作业,但您应该获得所需的架构数据数据布局 . 您还应该查看Confluent的kafka堆栈,尤其是它提供的架构注册表以及它提供的其余API .

相关问题