首页 文章

Java 错误中的 Kafka Producer

提问于
浏览
0

我对卡夫卡很新。我在端口 2181 上运行了 Zookeeper 服务器,在端口 9092 上运行了 Kafka 服务器。我在 java 中编写了一个 Simple Producer。但无论何时运行程序,它都会显示以下错误:

USAGE: java [options] KafkaServer server.properties [--override property=value]*
    Option      Description                           
    ------      -----------                           
    --override  Optional property that should override values set in server.properties file

我正在使用带有 JDK 8 的 Netbeans IDE,并在库中包含了所有 Kafka jar 文件。我相信库文件中没有错误,因为代码构建正确但不运行。

这是 Simple Producer 代码:

package kafka;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import java.util.Properties;

public class Kafka {
    private static Producer<Integer, String> producer;
    private final Properties properties = new Properties();
    public Kafka() {
        properties.put("metadata.broker.list", "localhost:9092");
        properties.put("serializer.class", "kafka.serializer.StringEncoder");
        properties.put("request.required.acks", "1");
        producer = new Producer<>(new ProducerConfig(properties));
    }
    public static void main(String args[]) {
        Kafka k = new Kafka();
        String topic = "test";
        String msg = "hello world";
        KeyedMessage<Integer, String> data = new KeyedMessage<>(topic, msg);
        producer.send(data);
        producer.close();
    }
}

请帮助:)

2 回答

  • 1

    看起来 Netbeans 执行错误的类 - 不是你的kafka.Kafka类,而是KafkaServer(看起来这是 Kafka 本身的一个主要类)。请配置 Netbeans 以执行正确的类。

    我建议从汇合的例子的现有生产者样本开始,以及 re-use Maven 项目...

  • 0

    我认为你的生产者配置是错误的。以下是 Kafka 官方文档中的示例:

    Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("acks", "all");
     props.put("retries", 0);
     props.put("batch.size", 16384);
     props.put("linger.ms", 1);
     props.put("buffer.memory", 33554432);
     props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
     props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    

    只需为batch.sizebuffer.memory尝试较小的值。

相关问题