我正在创建一个Flume代理,它从Kafka主题中提取数据,该通道是Kafka主题,而接收器是HDFS . 代理应该从kafka主题获取take数据并将其放在hdfs中的每日文件夹中 . 我正在进行cloudera发行 . 它目前没有摄取数据 . 没有给出错误消息,数据正在成功填充主题 .

sandbox.sources = kafka
sandbox.channels= channel
sandbox.sinks=sink_to_hdfs

#source configuration
sandbox.sources.kafka.type = org.apache.flume.source.kafka.KafkaSource
sandbox.sources.kafka.zookeeperConnect = zookeeper_address:2181/kafka
sandbox.sources.kafka.topic= topic
sandbox.sources.kafka.groupId= flume
sandbox.sources.kafka.channels=channel


#channel configuration
sandbox.channels.channel.type=org.apache.flume.channel.kafka.KafkaChannel
sandbox.channels.channel.brokerList=kafkabroker_server:9092 
sandbox.channels.channel.topic=kafka-channel-01
sandbox.channels.channel.zookeeperConnect = zookeeper_address:2181/kafka


#sink configuration
sandbox.sinks.sink_to_hdfs.channel= channel
sandbox.sinks.sink_to_hdfs.type=hdfs
sandbox.sinks.sink_to_hdfs.writeFormat=Text
sandbox.sinks.sink_to_hdfs.hdfs.fileType= DataStream
sandbox.sinks.sink_to_hdfs.hdfs.path=/user/myname/flume/%{topic}/%Y-%m-%d

sandbox.sinks.sink_to_hdfs.hdfs.rollInterval=10 
sandbox.sinks.sink_to_hdfs.hdfs.rollSize=8000000
sandbox.sinks.sink_to_hdfs.hdfs.rollCount=0
sandbox.sinks.sink_to_hdfs.hdfs.idleTimeout=3600

#channel capacity
sandbox.channels.channel.transactionCapacity = 1000
sandbox.channels.channel.capacity= 10000