我试图在循环中加载数据文件(以检查统计信息)而不是Kafka中的标准输入 . 下载Kafka后,我执行了以下步骤:
动物园管理员:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动服务器:
bin/kafka-server-start.sh config/server.properties
创建了一个名为“test”的主题:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
跑到制片人:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
Test1
Test2
听取消费者的意见:
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
Test1
Test2
我希望将数据文件传递给Producer,而不是标准输入,消费者可以直接看到它 . 或者是否有任何kafka 生产环境 者而不是使用控制台消费者,我可以使用它来读取数据文件 . 真的很感激任何帮助 . 谢谢!
5 回答
您可以通过cat读取数据文件并将其传递给kafka-console-producer.sh .
如果总有一个文件,您可以使用tail命令然后将其传递给kafka控制台 生产环境 者 .
但是,如果在满足某些条件时创建新文件,则可能需要使用apache.commons.io.monitor来监视创建的新文件,然后重复上述步骤 .
您也可以尝试kafkacat实用程序 . Github上的自述文件提供了示例
如果您可以分享哪种工具最适合您,那将是很棒的:)
来自KafkaCat自述文件的详细信息:
从stdin读取消息,使用snappy压缩生成'syslog'主题
点击此链接:http://grokbase.com/t/kafka/users/157b71babg/kafka-producer-input-file
Kafka有这个内置的File Stream Connector,用于将文件的内容传递给 生产环境 者(文件源),或者将文件内容定向到另一个目的地(文件接收器) .
我们有
bin/connect-standalone.sh
从文件中读取,可以在config/connect-file-source.properties
和config/connect-standalone.properties
中配置 .所以命令将是: