首页 文章

未能生成嵌入式 Kafka 代理

提问于
浏览
1

我正在尝试使用 kafka 的kafka.zk.EmbeddedZookeeperkafka.utils.TestUtils/createServer返回的kafka.server.KafkaServer来运行 kafka 服务器以进行测试。

但是我遇到了试图发送消息超时的障碍,并返回KafkaProducer$Future失败。下面是我正在使用的 kafka 版本。下面的代码是 Clojure 与 Kafka 库的互操作。

[org.apache.kafka/kafka_2.11 "0.10.0.1"]
[org.apache.kafka/kafka-clients "0.10.1.0"]

这是我得到了多远。

  • Zookeeper 端口是随机分配的(请参阅这里)。

  • 可以使用netcat成功创建 Zookeeper 服务器并连接到它。

  • 可以成功创建主题。

  • 可以使用 netcat 成功创建 Kafka 代理并连接到它。

  • 步骤 5 是进程失败的地方。

这个所以问题表明传递正确的Time对象很重要。但是MockTime看起来像是一个合理的实现。以前有人解决了这个问题吗

;; 1. Create Zookeeper
(require '[clojure.test :refer :all]
     '[kafkaesque.topics :as kt]
     '[kafkaesque.utils :as ku]
     '[clojure.pprint :refer [pprint]])

(import '[java.nio.file Files]
    '[kafka.zk EmbeddedZookeeper]
    '[kafka.server KafkaServer KafkaConfig]
    '[kafka.utils TestUtils Time MockTime])

(def zk-config {:zkhost "127.0.0.1"})
(def topic-name "client-test")
(def ^EmbeddedZookeeper zkServer (EmbeddedZookeeper.))

;; 2. Create Kafka Broker
(def zk-connect-str (str "127.0.0.1" ":" (.port zkServer)))
(def zku ((ZkUtils/apply (ZkUtils/createZkClient zk-connect-str 10000 8000) false)))
(def brokerhost "127.0.0.1")
(def brokerport "9092")

(def ^KafkaConfig config (KafkaConfig. {"zookeeper.connect" zk-connect-str
                       "broker.id" "0"
                       "log.dirs" (.toString
                           (.toAbsolutePath
                            (Files/createTempDirectory
                             "kafka-" (make-array java.nio.file.attribute.FileAttribute 0))))
                       "listeners" (str "PLAINTEXT://"  brokerhost  ":"  brokerport)}))

(def ^Time mock (MockTime.))
(def ^KafkaServer kafkaServer (TestUtils/createServer config mock))

;; 3. Create a Topic
(kt/create! zku topic-name 1 1 {})
(kt/topic-exists? zku topic-name)   ;; returns true

;; 4. Create a Producer and ProducerRecord
(def producer-a (kc/producer {"bootstrap.servers" "127.0.0.1:9092"
                 "acks"              "all"
                 "retries"           "0"
                 "batch.size"        "16384"
                 "linger.ms"         "1"
                 "buffer.memory"     "33554432"
                 "key.serializer"    "org.apache.kafka.common.serialization.StringSerializer"
                 "value.serializer"  "org.apache.kafka.common.serialization.StringSerializer"}))

(def message-key "k1")
(def message-value "foobar")
(def record-a (kc/producer-record topic-name 0 message-key message-value))

;; 5. Send a message
(def send-result (kc/send! producer-a record-a))  ;; Times out, and returns a KafkaProducer$Future failure.

1 回答

  • 0

    谢谢你 to_1_for 指出我的阅读障碍:)问题是版本不匹配。我正在使用 org.apache.kafka/kafka_2.11 “0.10.0.1”,org.apache.kafka/kafka-clients**“0.10.1.0”**。

    我将所有版本移动到**“0.10.1.0”**,一切都按预期工作。

    希望这可以帮助。

相关问题