我在我的服务器机器上运行单节点kafka . 我使用以下命令创建主题“bin / kafka-topics.sh --create --zookeeper localhost:2181 --rerelication-factor 1 --partitions 1 --topic test” . 我有两个正在运行的logstash实例 . 第一个从一些java应用程序日志文件中读取数据,将相同内容注入kafka . 它工作正常,我可以使用“bin / kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning”命令在控制台上查看kafka中的数据 . 但另一个从kafka(同一主题“test”)读取并注入elasicsearch的logstash实例失败了 . 第二个logstash实例无法读取数据 . 我将其配置文件更改为从kafka读取并在控制台上打印,然后它也不输出任何内容 . 这是失败logstash的配置文件:
// config file
input {
kafka {
zk_connect => "localhost:2181"
topic_id => "test"
}
}
output {
stdout{}
}
Logstash既不打印也不抛出任何错误 . 我正在使用Logstash 2.4和kafka 0.10 . 我用过kafka快速入门指南(http://kafka.apache.org/documentation.html#quickstart)
3 回答
如果查看Kafka输入plugin configuration,您可以看到一个重要参数,它允许连接到Kafka集群:zk_connect .
根据文档,它默认设置为localhost:2181 . 确保将其设置为Kafka群集实例,或者理想情况下设置为多个实例,具体取决于您的设置 .
例如,假设您正在使用JSON主题连接到三节点Kafka群集 . 配置如下:
此外,为主题配置正确的编解码器也很重要 . 上面的示例将适用于JSON事件 . 如果您使用Avro,则需要设置另一个参数 - codec . 有关如何配置它的详细信息已有详细记录on the documentation page . 它基本上需要指向Avro模式文件,可以作为avsc文件或Schema Registry endpoints 给出(在我看来更好的解决方案) .
如果您在Kafka环境中运行了Schema Registry,则可以将编解码器指向它的URL . 一个完整的例子是:
希望它有效!
请查看Kafka输入配置选项https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html
请找到来自Kafka的数据的Logstash配置并将其推送到ELK Stack .
希望能帮助到你!