首页 文章

如何将Kafka直接流JSON存储到Cassandra中?

提问于
浏览
0

我必须将火花流数据保存到Cassandra中 . Stream来自Kafka,Kafka消息采用JSON格式,如下所示 .

{
  "status": "NOT_AVAILABLE",
  "itemid": "550672332",
  "qty": 0,
  "lmts": "2017-11-18T10:39:21-08:00",
  "timestamp": 1511030361000
}

我在Spark 2.2.0中编写了以下代码 .

case class NliEvents(itemid: String, status: String, qty: String)

def main(args: Array[String]): Unit = {
 .....
  val stream = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )


    val valueStream = stream.map(_.value())
    val cassandraCrud = new CassandraOperations
    import com.datastax.spark.connector._

    val columns = SomeColumns("itemid", "status", "qty")
    val keySpace = configuration.getString(env + ".cassandra.keyspace")
    val gson = new Gson()
    import org.json4s._
    import org.json4s.jackson.JsonMethods._
    implicit val formats = DefaultFormats
    valueStream.foreachRDD((rdd, time) => {
      if (!rdd.isEmpty()) {
        val mapped = rdd.map(records => {
          val json = parse(records)
          val events = json.extract[NliEvents]
          events
        }
        )
        mapped.saveToCassandra(keySpace, "nli_events", columns)
      }
    })
}

当我运行此代码时,我得到了

java.io.NotSerializableException: org.json4s.DefaultFormats$

错误 . 可能是我没有正确地做到这一点 .

1 回答

  • 0

    你能用以下代码替换你的foreach语句吗?

    valueStream.mapPartitions(x => {
      val lst = scala.collection.mutable.ListBuffer[NliEvents]()
      while (x.hasNext) {
        val json = parse(x.next())
        val events = json.extract[NliEvents]
        lst += events
    
      }
      lst.toList.iterator
      }
    ).saveToCassandra(keySpace, "nli_events",columns)
    

    它应该工作 . 如果您有任何错误,请告诉我 .

相关问题