首页 文章

使用Scala创建一个简单的Kafka使用者

提问于
浏览
2

我目前正在学习Scala并试图创建一个SimpleConsumer来检索来自Kafka分区的消息 .

消费者应该能够处理以下任务:

  • 跟踪偏移量 .

  • 找出哪个Broker是主题和分区的主要代理

  • 必须能够处理经纪人领导者的变化 .

我能够找到一个非常好的文档来用Java创建这个使用者(https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example) .

有没有人有一个示例Scala代码来创建这个简单的消费者,或者如果你可以参考我一些文件,这将指出我正确的方向,将非常感谢 .

2 回答

  • 0

    以下是使用Scala编写的Simple Kafka使用者的示例代码 . 经过一些试验和错误后,它得到了工作 .

    package com.Kafka.Consumer
    
    import kafka.api.FetchRequest
    import kafka.api.FetchRequestBuilder
    import kafka.api.PartitionOffsetRequestInfo
    import kafka.common.ErrorMapping
    import kafka.common.TopicAndPartition
    import kafka.javaapi._
    import kafka.javaapi.consumer.SimpleConsumer
    import kafka.message.MessageAndOffset
    import java.nio.ByteBuffer
    import java.util.ArrayList
    import java.util.Collections
    import java.util.HashMap
    import java.util.List
    import java.util.Map
    import SimpleExample._
    
    //remove if not needed
    import scala.collection.JavaConversions._
    
    object SimpleExample {
    
      def main(args: Array[String]) {
        val example = new SimpleExample()
        val maxReads = java.lang.Integer.parseInt(args(0))
        val topic = args(1)
        val partition = java.lang.Integer.parseInt(args(2))
        val seeds = new ArrayList[String]()
        seeds.add(args(3))
        val port = java.lang.Integer.parseInt(args(4))
        try {
          example.run(maxReads, topic, partition, seeds, port)
        } catch {
          case e: Exception => {
            println("Oops:" + e)
            e.printStackTrace()
          }
        }
      }
    
      def getLastOffset(consumer: SimpleConsumer, 
          topic: String, 
          partition: Int, 
          whichTime: Long, 
          clientName: String): Long = {
        val topicAndPartition = new TopicAndPartition(topic, partition)
        val requestInfo = new HashMap[TopicAndPartition, PartitionOffsetRequestInfo]()
        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1))
        val request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion, clientName)
        val response = consumer.getOffsetsBefore(request)
        if (response.hasError) {
          println("Error fetching data Offset Data the Broker. Reason: " + 
            response.errorCode(topic, partition))
          return 0
        }
        val offsets = response.offsets(topic, partition)
        offsets(0)
      }
    }
    
    class SimpleExample {
    
      private var m_replicaBrokers: List[String] = new ArrayList[String]()
    
      def run(a_maxReads: Int, 
          a_topic: String, 
          a_partition: Int, 
          a_seedBrokers: List[String], 
          a_port: Int) {
        val metadata = findLeader(a_seedBrokers, a_port, a_topic, a_partition)
        if (metadata == null) {
          println("Can't find metadata for Topic and Partition. Exiting")
          return
        }
        if (metadata.leader == null) {
          println("Can't find Leader for Topic and Partition. Exiting")
          return
        }
        var leadBroker = metadata.leader.host
        val clientName = "Client_" + a_topic + "_" + a_partition
        var consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
        var readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.EarliestTime, clientName)
        var numErrors = 0
        //while (a_maxReads > 0) {
          if (consumer == null) {
            consumer = new SimpleConsumer(leadBroker, a_port, 100000, 64 * 1024, clientName)
          }
          val req = new FetchRequestBuilder().clientId(clientName).addFetch(a_topic, a_partition, readOffset, 
            100000)
            .build()
          val fetchResponse = consumer.fetch(req)
          if (fetchResponse.hasError) {
            numErrors += 1
            val code = fetchResponse.errorCode(a_topic, a_partition)
            println("Error fetching data from the Broker:" + leadBroker + 
              " Reason: " + 
              code)
            if (numErrors > 5) //break
            if (code == ErrorMapping.OffsetOutOfRangeCode) {
              readOffset = getLastOffset(consumer, a_topic, a_partition, kafka.api.OffsetRequest.LatestTime, clientName)
              //continue
            }
            consumer.close()
            consumer = null
            leadBroker = findNewLeader(leadBroker, a_topic, a_partition, a_port)
            //continue
          }
          numErrors = 0
          var numRead = 0
          for (messageAndOffset <- fetchResponse.messageSet(a_topic, a_partition)) {
            val currentOffset = messageAndOffset.offset
            if (currentOffset < readOffset) {
              println("Found an old offset: " + currentOffset + " Expecting: " + 
                readOffset)
              //continue
            }
            readOffset = messageAndOffset.nextOffset
            val payload = messageAndOffset.message.payload
            val bytes = Array.ofDim[Byte](payload.limit())
            payload.get(bytes)
            println(String.valueOf(messageAndOffset.offset) + ": " + new String(bytes, "UTF-8"))
            numRead += 1
           // a_maxReads -= 1
          }
          if (numRead == 0) {
            try {
              Thread.sleep(1000)
            } catch {
              case ie: InterruptedException => 
            }
          }
        //}
        if (consumer != null) consumer.close()
      }
    
      private def findNewLeader(a_oldLeader: String, 
          a_topic: String, 
          a_partition: Int, 
          a_port: Int): String = {
        for (i <- 0 until 3) {
          var goToSleep = false
          val metadata = findLeader(m_replicaBrokers, a_port, a_topic, a_partition)
          if (metadata == null) {
            goToSleep = true
          } else if (metadata.leader == null) {
            goToSleep = true
          } else if (a_oldLeader.equalsIgnoreCase(metadata.leader.host) && i == 0) {
            goToSleep = true
          } else {
            return metadata.leader.host
          }
          if (goToSleep) {
            try {
              Thread.sleep(1000)
            } catch {
              case ie: InterruptedException => 
            }
          }
        }
        println("Unable to find new leader after Broker failure. Exiting")
        throw new Exception("Unable to find new leader after Broker failure. Exiting")
      }
    
      private def findLeader(a_seedBrokers: List[String], 
          a_port: Int, 
          a_topic: String, 
          a_partition: Int): PartitionMetadata = {
        var returnMetaData: PartitionMetadata = null
    
        for (seed <- a_seedBrokers) {
          var consumer: SimpleConsumer = null
          try {
            consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024, "leaderLookup")
            val topics = Collections.singletonList(a_topic)
            val req = new TopicMetadataRequest(topics)
            val resp = consumer.send(req)
            val metaData = resp.topicsMetadata
            for (item <- metaData; part <- item.partitionsMetadata){
              if (part.partitionId == a_partition) {
              returnMetaData = part
             //break
            }
            }
          } catch {
            case e: Exception => println("Error communicating with Broker [" + seed + "] to find Leader for [" + 
              a_topic + 
              ", " + 
              a_partition + 
              "] Reason: " + 
              e)
          } finally {
            if (consumer != null) consumer.close()
          }
        }
        if (returnMetaData != null) {
          m_replicaBrokers.clear()
          for (replica <- returnMetaData.replicas) {
            m_replicaBrokers.add(replica.host)
          }
        }
        returnMetaData
      }
    }
    
  • 5

    我使用scala构建了一个简单的kafka使用者和 生产环境 者 .

    消费者:

    package com.kafka
    
    import java.util.concurrent._
    import java.util.{Collections, Properties}
    
    import com.sun.javafx.util.Logging
    import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
    
    import scala.collection.JavaConversions._
    
    class Consumer(val brokers: String,
                   val groupId: String,
                   val topic: String) extends Logging {
    
      val props = createConsumerConfig(brokers, groupId)
      val consumer = new KafkaConsumer[String, String](props)
      var executor: ExecutorService = null
    
      def shutdown() = {
        if (consumer != null)
          consumer.close()
        if (executor != null)
          executor.shutdown()
      }
    
      def createConsumerConfig(brokers: String, groupId: String): Properties = {
        val props = new Properties()
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000")
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000")
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
        props
      }
    
      def run() = {
        consumer.subscribe(Collections.singletonList(this.topic))
    
        Executors.newSingleThreadExecutor.execute(new Runnable {
          override def run(): Unit = {
            while (true) {
              val records = consumer.poll(1000)
    
              for (record <- records) {
                System.out.println("Received message: (" + record.key() + ", " + record.value() + ") at offset " + record.offset())
              }
            }
          }
        })
      }
    }
    
    object Consumer extends App{
      val newArgs = Array("localhost:9092", "2","test")
      val example = new Consumer(newArgs(0), newArgs(1), newArgs(2))
      example.run()
    }
    

    制片人:

    package com.kafka
    
    import java.util.{Date, Properties}
    
    import org.apache.kafka.clients.producer.KafkaProducer
    import org.apache.kafka.clients.producer.ProducerRecord
    
    object Producer extends App{
      val newArgs = Array("20","test","localhost:9092")
      val events = newArgs(0).toInt
      val topic = newArgs(1)
      val brokers = newArgs(2)
      val props = new Properties()
      props.put("bootstrap.servers", brokers)
      props.put("client.id", "producer")
      props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
      props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    
    
      val producer = new KafkaProducer[String, String](props)
      val t = System.currentTimeMillis()
      for (nEvents <- Range(0, events)) {
        val key = "messageKey " + nEvents.toString
        val msg = "test message"
        val data = new ProducerRecord[String, String](topic, key, msg)
    
        //async
        //producer.send(data, (m,e) => {})
        //sync
        producer.send(data)
      }
    
      System.out.println("sent per second: " + events * 1000 / (System.currentTimeMillis() - t))
      producer.close()
    }
    

相关问题