首页 文章

kafka:无法在docker内部向kafka发送消息

提问于
浏览
0

docker-compose.yml(https://github.com/wurstmeister/kafka-docker

version: "2.1"
services:
  zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
  kafka:
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

尝试在https://kafka.apache.org/quickstart之后生成消息时出错:

~/kafka_2.11-1.0.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
>gh 
>[2018-01-19 17:28:15,385] ERROR Error when sending message to topic test with key: null, value: 2 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-0: 1566 ms has passed since batch creation plus linger time

列表主题:

~/kafka_2.11-1.0.0$ bin/kafka-topics.sh --list --zookeeper localhost:2181
__consumer_offsets
test

为什么?谢谢

UPDATE

如何设置 KAFKA_ADVERTISED_HOST_NAME 或网络来制作我的python / java程序或kafka-console-producer.sh(在docker容器外)以通过 localhost:9092 生成到kafka的消息?

UPDATE

似乎以下docker-compose.yml正常工作

version: "2"

services:
  zookeeper:
    image: "wurstmeister/zookeeper:latest"
    network_mode: "host"
    ports:
      - 2181:2181
  kafkaserver:
    image: "wurstmeister/kafka:latest"
    network_mode: "host"
    ports:
      - 9092:9092
    environment:
      KAFKA_CREATE_TOPICS: "test:3:1"
      KAFKA_ZOOKEEPER_CONNECT: localhost:2181

1 回答

  • 0

    我遇到过同样的问题 . kafka-docker README中建议的语法与提供的docker-compose.yml不匹配,后者不能正常工作 . 我终于找到了这篇文章,并且BEA更新的docker-compose.yml文件的变体为我工作 . 谢谢!

    这是详细信息 .

    我正在按照https://bertrandszoghy.wordpress.com/2018/05/03/building-the-hyperledger-fabric-vm-and-docker-images-version-1-1-from-scratch/所述设置的Ubuntu 16.04虚拟映像上运行wurstmeister / kafka-docker

    我的docker-compose.yml文件:

    version: '2'
    services:
      zookeeper:
        image: "wurstmeister/zookeeper:latest"
        network_mode: "host"
        ports:
          - "2181:2181"
      kafka:
        image: "wurstmeister/kafka:latest"
        network_mode: "host"
        ports:
          - 9092:9092
        environment:
          KAFKA_LISTENERS: PLAINTEXT://:9092
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://172.17.0.1:9092
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_CREATE_TOPICS: "BertTopic:3:1"
    

    在同一个VM上我安装了NodeJs:

    curl -sL https://deb.nodesource.com/setup_6.x | sudo -E bash –
    sudo apt-get install -y nodejs
    cd
    mkdir nodecode
    cd nodecode
    sudo npm install -g node-pre-gyp
    sudo npm install kafka-node
    

    然后我运行以下程序来生成几条消息:

    var kafka = require('kafka-node'),
        Producer = kafka.Producer,
        KeyedMessage = kafka.KeyedMessage,
        client = new kafka.Client(),
        producer = new Producer(client),
        km = new KeyedMessage('key', 'message'),
        payloads = [
            { topic: 'BertTopic', messages: 'first test message', partition: 0 },
            { topic: 'BertTopic', messages: 'second test message', partition: 0 }
        ];
    producer.on('ready', function () {
        producer.send(payloads, function (err, data) {
            console.log(data);
            process.exit(0);
        });
    });
    
    producer.on('error', function (err) {
    console.log('ERROR: ' + err.toString());
    });
    

    返回的是:

    { BertTopic: { '0': 0 } }
    

    我运行第二个NodeJs程序来使用(最后)消息:

    var options = {
        fromOffset: 'latest'
    };
    
    var kafka = require('kafka-node'),
        Consumer = kafka.Consumer,
        client = new kafka.Client(),
        consumer = new Consumer(
            client,
            [
                { topic: 'BertTopic', partition: 0 }
            ],
            [
                    {
                            autoCommit: false
                    },
                    options =
                    {
                            fromOffset: 'latest'
                    }
            ]
        );
    

    返回的是:

    { topic: 'BertTopic',
      value: 'first test message',
      offset: 0,
      partition: 0,
      highWaterOffset: 2,
      key: null }
    { topic: 'BertTopic',
      value: 'second test message',
      offset: 1,
      partition: 0,
      highWaterOffset: 2,
      key: null }
    

    我还有第三个NodeJs程序来显示我博客文章中列出的主题中的所有历史消息https://bertrandszoghy.wordpress.com/2017/06/27/nodejs-querying-messages-in-apache-kafka/

    希望这有助于某人 .

相关问题