首页 文章

Flume Kafka HDFS:拆分消息

提问于
浏览
1

我有以下flume代理配置来读取来自kafka源的消息并将它们写回HDFS接收器

tier1.sources  = source1
tier 1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.zookeeperConnect = 192.168.0.100:2181
tier1.sources.source1.topic = test
tier1.sources.source1.groupId = flume
tier1.sources.source1.channels = channel1
tier1.sources.source1.interceptors = i1
tier1.sources.source1.interceptors.i1.type = timestamp
tier1.sources.source1.kafka.consumer.timeout.ms = 100

tier1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel
tier1.channels.channel1.brokerList = 192.168.0.100:9092

tier1.channels.channel1.topic = test
tier1.channels.channel1.zookeeperConnect = 192.168.0.100:2181/kafka
tier1.channels.channel1.parseAsFlumeEvent = false

tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.type = hdfs
tier1.sinks.sink1.hdfs.writeFormat = Text
tier1.sinks.sink1.hdfs.fileType = DataStream
tier1.sinks.sink1.hdfs.filePrefix = test-kafka
tier1.sinks.sink1.hdfs.fileSufix = .avro
tier1.sinks.sink1.hdfs.useLocalTimeStamp = true
tier1.sinks.sink1.hdfs.path = /tmp/kafka/%y-%m-%d
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.rollSize=0

如果每个轮询周期只有一个kafka消息到达,则kafka消息内容是正确序列化到文件中的avro数据 .

当两个kafka消息到达同一批次时,它们被分组在同一个HDFS文件上,因为avro消息包含两个架构数据,结果文件包含架构数据架构数据,导致它成为无效的.avro文件 .

如何拆分avro事件以将不同的kafka消息拆分为将其中的每一个写入不同的文件

谢谢

1 回答

  • 0

    一种方法:假设您将源kafka传入数据称为“SourceTopic” . 您可以在此“SourceTopic”中注册自定义接收器 .

    <FlumeNodeRole>.sinks.<your-sink>.type =net.my.package.CustomSink
    

    在CustomSink中,您可以编写一种方法来区分传入的消息,将其拆分并重新发送到不同的“DestinationTopic” . 这个'DestinationTopic'现在可以作为文件序列化的新水槽来源 .

    请参阅下面的管道水槽链接:https://flume.apache.org/FlumeUserGuide.html

相关问题