更新#2 - 我相信我找到了解决方案,我正在回答这个问题的完整性 . 似乎我需要使用我的实例公共IP地址和端口9092设置以下配置 .

advertised.host.name
advertised.port

我正在运行运行版本0.9.0.1的双节点Kafka服务器 . 我已经完成了快速入门并测试了我可以在内部发送和接收消息,就像在第一个Kafka服务器节点上生成消息并在第二个Kafka服务器节点上消费它一样 . 两个节点都在同一网络上 .

然后,我尝试从Kafka节点所在的网络外部的外部盒子中生成一些消息 . 我已确保相应的端口已打开并经过测试,我可以使用端口9092上的telnet从我的 生产环境 者计算机上打开这两个框 .

一切似乎都很好;我的小型Java应用程序将毫无错误地发送消息,但消费者从未收到任何消息 . 我检查了Kafka日志,没有任何东西 . 是否需要在server.properties文件中设置任何其他配置才能使Kafka服务器从其内部网络之外的 生产环境 者处使用?

更新**这个问题似乎在我的制片人身上被隔离了 . 当Kafka在运行Java 生产环境 者代码的同一台机器上本地运行时,它按预期工作 . 当我运行Java代码并向外部机器生成相同的消息时,该消息不包含在producer.send调用中 .

我已通过运行以下命令验证了这一点 .

sudo tcpdump -n -i en0 -xX port 9092

这会监视我的 生产环境 者机器在端口9092上发出的流量 . 我可以看出这个主题,但显然不存在消息 .

这是我为制作人使用的代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.storm.shade.org.yaml.snakeyaml.Yaml;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.util.Map;
import java.util.Properties;

import static constants.Constants.*;

public class TestProducer {

    // variable declarations
    private KafkaProducer<String, String> _producer;
    private Properties _props;
    private String _topic;

    public TestProducer(String configPath, String topic) {

        this._props = initialize(configPath);
        this._producer = new KafkaProducer<>(_props);
        this._topic = topic;

    }

    private Properties initialize(String configPath) {

        Yaml yaml = new Yaml();
        Properties props = new Properties();

        try {

            InputStream is = new FileInputStream(new File(configPath));

            // Parse the YAML file and return the output as a series of Maps and Lists
            Map<String, Object> config = (Map<String, Object>) yaml.load(is);

            props.put("bootstrap.servers", config.get(PROD_BOOTSTRAP_SERVERS));
            props.put("acks", config.get(PROD_ACKS));
            props.put("retries", config.get(PROD_RETRIES));
            props.put("batch.size", config.get(PROD_BATCH_SIZE));
            props.put("linger.ms", config.get(PROD_LINGER_MS));
            props.put("buffer.memory", config.get(PROD_BUFFER_MEMORY));
            props.put("key.serializer", config.get(PROD_KEY_SERIALIZER));
            props.put("value.serializer", config.get(PROD_VALUE_SERIALIZER));

        } catch( FileNotFoundException e ){
                LOG.error("FileNotFoundException: could not initialize Properties");
        }

        return props;
    }

    public void produce(String message) {

        try {

            //verify that we have supplied a topic
            if( _topic != null && !_topic.isEmpty() ) {
                System.out.println("Producing message:" + message);
                _producer.send(new ProducerRecord<String, String>(_topic, message));
            }

        } catch (Throwable throwable) {

            System.out.printf("%s", throwable.getStackTrace());

        }
    }

    public void close() {
        _producer.close();
    }
}