首页 文章

scala-如何确认Kafka服务器(代理)中存在特定主题?

提问于
浏览
1

我正在使用scala,spark和Kafka . 我有两个问题 .

1.如何确认Kafka经纪人(服务器)中存在的主题?

2.如何确认Kafka服务器(引导服务器)是否正在运行?

object kafkaProducer extends App {

  def sendMessages(): Unit = {


//define topic
val topic = "spark-topic"       // how can i confirm this topic is exist in kafka server ? 

//define producer properties
val props = new java.util.Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("client.id", "KafkaProducer")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.connect.json.JsonSerializer")

//create producer instance
val kafkaProducer = new KafkaProducer[String, JsonNode](props)

//create object mapper
val mapper = new ObjectMapper with ScalaObjectMapper
mapper.registerModule(DefaultScalaModule)
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

//mapper Json object to string
      def toJson(value: Any): String = {
        mapper.writeValueAsString(value)
      }


//send producer message

    val jsonstring =
      s"""{
         | "id": "0001",
         | "name": "Peter"
         |}
      """.stripMargin

    val jsonNode: JsonNode = mapper.readTree(jsonstring)
    val rec = new ProducerRecord[String, JsonNode](topic, jsonNode)
    kafkaProducer.send(rec)
    //println(rec)

  }

}

2 回答

  • 0
    • 您可以使用AdminUtils类中的topicExists()进行检查:
    // zookeeper server e.g "localhost:2181"
    def existTopic(zkServer: String): Boolean = {
        val zkConnection = new ZkConnection(zkServer)
        val zkClient = new ZkClient(zkServer)
        AdminUtils.topicExists(new ZkUtils(zkClient, zkConnection, false), topic)
    }
    
    • 在ZkConnection类中使用getZookeeperState()
    def isAlive(zkServer: String): Boolean = {
        val zkConnection = new ZkConnection(zkServer)
        zkConnection.getZookeeperState.isAlive
    }
    
  • 0

    1)检查主题是否存在的推荐方法是使用AdminClient API .

    您可以使用listTopics()describeTopics() .

    2)假设您没有任何权限访问群集(检查指标或活动探测),检查群集正在运行的唯一方法是尝试连接/使用它 .

    使用AdminClient,您可以使用describeCluster()来尝试检索群集的状态 .

相关问题