首页 文章

Logsatsh和kafka

提问于
浏览
1

我在我的服务器机器上运行单节点kafka . 我使用以下命令创建主题“bin / kafka-topics.sh --create --zookeeper localhost:2181 --rerelication-factor 1 --partitions 1 --topic test” . 我有两个正在运行的logstash实例 . 第一个从一些java应用程序日志文件中读取数据,将相同内容注入kafka . 它工作正常,我可以使用“bin / kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning”命令在控制台上查看kafka中的数据 . 但另一个从kafka(同一主题“test”)读取并注入elasicsearch的logstash实例失败了 . 第二个logstash实例无法读取数据 . 我将其配置文件更改为从kafka读取并在控制台上打印,然后它也不输出任何内容 . 这是失败logstash的配置文件:

// config file
     input {
        kafka {
        zk_connect => "localhost:2181"
        topic_id => "test" 
        }
        }
        output {
        stdout{}
        }

Logstash既不打印也不抛出任何错误 . 我正在使用Logstash 2.4和kafka 0.10 . 我用过kafka快速入门指南(http://kafka.apache.org/documentation.html#quickstart

3 回答

  • 0

    如果查看Kafka输入plugin configuration,您可以看到一个重要参数,它允许连接到Kafka集群:zk_connect .

    根据文档,它默认设置为localhost:2181 . 确保将其设置为Kafka群集实例,或者理想情况下设置为多个实例,具体取决于您的设置 .

    例如,假设您正在使用JSON主题连接到三节点Kafka群集 . 配置如下:

    kafka {
     topic_id => "your_topic"
     zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
    }
    

    此外,为主题配置正确的编解码器也很重要 . 上面的示例将适用于JSON事件 . 如果您使用Avro,则需要设置另一个参数 - codec . 有关如何配置它的详细信息已有详细记录on the documentation page . 它基本上需要指向Avro模式文件,可以作为avsc文件或Schema Registry endpoints 给出(在我看来更好的解决方案) .

    如果您在Kafka环境中运行了Schema Registry,则可以将编解码器指向它的URL . 一个完整的例子是:

    kafka {
     codec => avro_schema_registry { endpoint => "http://kc1.host:8081"}
     topic_id => "your_topic"
     zk_connect => "kc1.host:2181,kc2.host:2181,kc3.host:2181"
    }
    

    希望它有效!

  • 1
    @wjp 
    Hi wjp, I am running single node kafka cluster. There is no Schema Registry running. zookeeper is also running.   I used following command to create topic "bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test".  I have two logstash instances running. First one reads data from some java application log file inject the same to the kafka. It works fine, I can see data in kafka on console using "bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning" command. But the other logstash instance which reads from kafka(same topic "test") and injects into elasicsearch, is failing. This second instance of logstash fails to read data. I changed its configuration file to read from kafka and print on console, then also it does not output anything.Here is the config file for failing logstash:
    input {
    kafka {
    zk_connect => "localhost:2181"
    topic_id => "test" 
    }
    }
    output {
    stdout{}
    }
    Logstash neither print anything nor it throws any error.
    I am using Logstash 2.4 and kafka 0.10.
    I used kafka quick start guide (http://kafka.apache.org/documentation.html#quickstart)
    
  • 1

    请查看Kafka输入配置选项https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html

    请找到来自Kafka的数据的Logstash配置并将其推送到ELK Stack .

    input {
      kafka {
        bootstrap_servers => "localhost:9092"
        topics => ["topic_name"]
      }
    }
    
    output{
        elasticsearch{
            hosts => ["http://localhost:9200/"]
            index => "index_name"
        }
    }
    

    希望能帮助到你!

相关问题