我有一个现有的Kafka主题和一个从那里读取并写入HDFS的水槽代理 . 我想重新配置我的水槽代理,这样它就会远离现有的设置;一个Kafka源,文件通道到HDFS接收器,使用Kafka Channels .
我在cloudera documentation中读到可以通过仅使用Kafka通道和HDFS接收器(没有水槽源)实现这一点..(除非我得到了错误的结束 . )所以我尝试创建这个配置但是它isn 't working. It'甚至没有启动盒子上的水槽过程 .
# Test
test.channels = kafka-channel
test.sinks = hdfs-sink
test.channels.kafka-channel.type =
org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false
test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8082/data/test/
我正在使用:
-
HDP快速入门VM 2.6.3
-
Flume版本1.5.2
-
HDFS目录确实存在
-
ps -ef | grep flume
只在我添加kafka-source后才返回进程,但这不可能是正确的,因为这样做会为发布到主题上的任何消息创建一个无限循环 .
是否可以仅使用Kafka Channels 和HDFS接收器,或者我是否需要使用kafka-source但更改一些其他配置以防止无限循环的消息?
Kafka-source
- > kafka-channel
- > HDFS Sink
- 这对我来说似乎不对 .
2 回答
在挖了一下后,我注意到Ambari没有为指定的代理创建任何水槽配置文件 . 如果我指定
test.sources = kafka-source
,Ambari似乎只创建/更新flume配置 . 一旦我将其添加到水槽配置中(通过ambari),就会在盒子上创建配置,并且水槽代理成功启动 .最终的水槽配置看起来像这样:
注意我没有设置源上的任何属性(这会导致我在我的问题中提到的无限循环问题),它只需要提及所以Ambari创建了flume配置并启动代理 .
这并没有使用Apache Kafka这种模式最好用Kafka Connect(Apache Kafka的一部分)来解决 . 根据this guide here,有一个易于使用的Kafka Connect HDFS连接器 .