首页 文章

用于检查kafka代理(服务器)是否可用的Scala代码

提问于
浏览
-3

如何检查Kafka服务器是否可用 .

我试过下面的scala代码-Producer API .

val props = new Properties()
  props.put("bootstrap.servers", "hworker.dev.arch.mum.private:6667")
  props.put("acks", "1")
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
  props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")

我可以知道如何检查上述引导服务器是否可用 .

1 回答

  • 0

    一种选择是使用Kafka AdminClient,如下所示 . 如果服务器没有响应,它将返回TimeOutException .

    private static final TIMEOUT_MS = 5000;           
     try (AdminClient client = AdminClient.create(props)) {
                client.listTopics(new ListTopicsOptions().timeoutMs(TIMEOUT_MS)).listings().get();
            } catch (ExecutionException ex) {
                LOG.error("Kafka is not available, timed out after {} ms", TIMEOUT_MS);
                return;
            }
    

    附:它适用于Kafka 0.11版本 .

相关问题