出于测试目的,我尝试在我的本地迷你管道上创建Kafka群集 . Kubernetes的集群 must be reachable from outside .

当我从pod内部 生产环境 /消费时没有问题,一切正常 .

当我从我的本地机器 生产环境 时

bin/kafka-console-producer.sh --topic mytopic --broker-list 192.168.99.100:32767

其中192.168.99.100是我的minikube-ip,32767是kafka服务的节点端口 .

我收到以下错误信息:

>testmessage
>[2018-04-30 11:55:04,604] ERROR Error when sending message to topic ams_stream with key: null, value: 11 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for ams_stream-0: 1506 ms has passed since batch creation plus linger time

当我从本地机器消耗时,我收到以下警告:

[2018-04-30 10:22:30,680] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:23:46,057] WARN Connection to node 8 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:25:01,542] WARN Connection to node 2 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2018-04-30 10:26:17,008] WARN Connection to node 5 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

经纪人ID是正确的,所以看起来我至少可以到达经纪人


Edit:

我认为问题可能是,该服务将“随机”路由到我的任何经纪人,但他需要将我引导到该主题的领导者 . 这可能是问题吗?有没有人知道解决这个问题的方法?


其他信息:

我正在使用wurstmeister/kafkadigitalwonderland/zookeeper图像

我开始使用DellEMC Tutorial(以及来自defuze.org的链接的)

这对我来说没有用,所以我在kafka-service.yml(1)和kafka-cluster.yml(2)中做了一些更改

kafka-service.yml

  • 添加了一个固定的NodePort

  • 从选择器中删除了id

kafka-cluster.yml

  • 为规范添加了副本

  • 从标签中删除了ID

  • 更改了由IP中的最后一个数字生成的代理ID

  • 将deprecated_host_name / advertised_port替换为不推荐使用的值

  • 听众(pod-ip:9092)用于在k8s内部进行通信

  • advertised_listeners(minikube-ip:node-port)用于与kubernetes之外的应用程序进行通信

1 - kafka-service.yml:

---
apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    name: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    nodePort: 32767
    targetPort: 9092
    protocol: TCP
  selector:
    app: kafka
  type: LoadBalancer

2 - kafka-cluster.yml:

---
kind: Deployment
apiVersion: extensions/v1beta1
metadata:
  name: kafka-b
spec:
  replicas: 3
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        ports:
        - containerPort: 9092
        env:
        - name: HOSTNAME_COMMAND
          value: "ifconfig |grep 'addr:172' |cut -d':' -f 2 |cut -d ' ' -f 1"
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: zk1:2181
        - name: BROKER_ID_COMMAND
          value: "ifconfig |grep 'inet addr:172' | cut -d'.' -f '4' | cut -d' ' -f '1'"
        - name: KAFKA_ADVERTISED_LISTENERS
          value: "INTERNAL://192.168.99.100:32767"
        - name: KAFKA_LISTENERS
          value: "INTERNAL://_{HOSTNAME_COMMAND}:9092"
        - name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
          value: "INTERNAL:PLAINTEXT"
        - name: KAFKA_INTER_BROKER_LISTENER_NAME
          value: "INTERNAL"
        - name: KAFKA_CREATE_TOPICS
          value: mytopic:1:3