首页 文章

只有一个文件来自kafka的hdfs与flume

提问于
浏览
1

我正试图通过flume将数据放入kafka的hdfs中 . kafka_producer每10秒发送一条消息 . 我要在hdfs上的一个文件中收集所有消息 . 这是我使用的水槽的配置,但它在hdfs上存储了许多文件(一个用于消息):

agent1.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.kafka-source.zookeeperConnect = localhost:2181
agent1.sources.kafka-source.topic = prova
agent1.sources.kafka-source.groupId = flume
agent1.sources.kafka-source.channels = memory-channel
agent1.sources.kafka-source.interceptors = i1
agent1.sources.kafka-source.interceptors.i1.type = timestamp
agent1.sources.kafka-source.kafka.consumer.timeout.ms = 100
agent1.channels.memory-channel.type = memory
agent1.channels.memory-channel.capacity = 10000
agent1.channels.memory-channel.transactionCapacity = 1000
agent1.sinks.hdfs-sink.type = hdfs
agent1.sinks.hdfs-sink.hdfs.path = hdfs://localhost:9000/input
agent1.sinks.hdfs-sink.hdfs.rollInterval = 5
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 0
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.channel = memory-channel
agent1.sources = kafka-source
agent1.channels = memory-channel
agent1.sinks = hdfs-sink

附:我从file.csv开始 . kafka 生产环境 者获取文件并选择一些感兴趣的字段,然后每隔10秒发送一个条目 . Flume将条目存储在hadoop hdfs上,但是在许多文件中(1个条目= 1个文件) . 我希望所有条目都在一个文件中 . 怎么改变水槽的配置?

2 回答

  • 0

    目前确实设置了水槽来为每个输入文件在HDFS上创建一个文件 .

    正如here建议的那样,你可以通过编写一个定期的pig(或mapreduce)作业来处理这个问题,该作业接收所有输入文件并将它们组合起来 .

    减少文件数量的附加选项可能是降低入站文件的频率 .

  • 0

    将rollInterval Build 为0,因为您不希望根据时间生成不同的文件 . 如果要根据数字输入或事件进行更改,请更改rollCount值 . 例如,如果要将10个事件或条目保存在一个文件中:

    agent1.sinks.hdfs-sink.hdfs.rollInterval = 0
    agent1.sinks.hdfs-sink.hdfs.rollSize = 0
    agent1.sinks.hdfs-sink.hdfs.rollCount = 10
    

相关问题