如何使用动态DNS设置公共Kafka代理?

loading...


0

我使用3个Zookeepers和每个代理配置了一个带有3个代理的Kafka集群 . 下图显示了我的集群的图形表示 .

使用主机 192.168.0.10 在同一网络中进行的 生产环境 者和消费者测试通过 kafka-console-producerkafka-console-consumer 命令完美地工作 .

基于该上下文,当我尝试通过Internet通过 kafka-console-producer.sh --broker-list DYNAMIC_DNS_ADDR:30192,DYNAMIC_DNS_ADDR:30292,DYNAMIC_DNS_ADDR:30392 --topic twitter_tweets 生成一些数据时,我收到以下错误:

[2018-12-10 09:59:20,772] ERROR使用key发送消息到主题twitter_tweets时出错:null,值:16个字节,错误:(org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)org . apache.kafka.common.errors.TimeoutException:对于twitter_tweets-1过期1条记录:自创建批处理以及延迟时间[2018-12-10 09:59:22,273]以来已经过了1505 ms WARN [Producer clientId = console- 生产环境 者]无法 Build 与节点1的连接 . 经纪可能无法使用 . (org.apache.kafka.clients.NetworkClient)

Broker侦听器配置有以下属性:

listeners=PLAINTEXT://0.0.0.0:9092,SSL://0.0.0.0:9443
advertised.listeners=PLAINTEXT://192.168.0.241:9092,SSL://192.168.0.241:9443

显然,每个代理都为 advertised.listeners 属性更改了IP地址 . 我正在使用 CentOS 6.10Kafka 2.0.1 进行设置 . telnet测试工作正常 . Kafka REST代理端口的另一个转发是通过Internet工作并列出所有主题 .

loading...

1回答

  • 1

    https://rmoff.net/2018/08/02/kafka-listeners-explained/

    您需要两个侦听器 - 一个响应并通告内部地址,一个用于外部地址 .

    关键是你的客户端连接的监听器将返回该监听器的 host address and port .

    目前,您将外部设备欺骗到内部设备,因此您的外部流量会触及内部监听器 .

    你需要这样的东西(根据每个经纪人的需要改变 aws_internal_listener 的IP /主机名):

    KAFKA_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://192.168.0.241:29092
    
    KAFKA_ADVERTISED_LISTENERS: aws_internal_listener://192.168.0.241:9092,external_listener://DYNAMIC_DNS_ADDR:29092
    
    KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: aws_internal_listener:PLAINTEXT,external_listener:PLAINTEXT
    KAFKA_INTER_BROKER_LISTENER_NAME: aws_internal_listener
    

    然后, DYNAMIC_DNS_ADDR 的端口转发器应将连接重定向到AWS节点上的29092 . 关键是外部连接不应该终止于与内部侦听器匹配的主机上的侦听器端口(它通告内部 192.168.0 地址)

    使用 kafkacat -L -b DYNAMIC_DNS_ADDR:29092 来调试和验证您的配置,如in the article here所述 .

评论

暂时没有评论!