我一直在实现spark streaming来读取Kafka主题,实体为JSON . 需要转换来提取JSON对象信息 . 之后,我需要检查Cassandra表中的一些值,然后根据JSON文件属性更新一行或在Cassandra中插入一个新行 . 所以这是简化的代码:

import ...

object SparkStreamingTest {

  def main(args: Array[String]) {

   val conf = new SparkConf().setAppName("SparkStreamingTest").setMaster("local[*]").set("spark.cassandra.connection.host","172.16.133.11")
   val sc = new SparkContext(conf)
   val ssc = new StreamingContext(sc, Seconds(10))
   val kafkaConf = Map(
     ...
   )

   val messages = KafkaUtils.createDirectStream[String, String](
     ssc,
     LocationStrategies.PreferConsistent,
     ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
   )

   messages.map(_.value)
     .flatMap(_.split("\n"))
     .foreachRDD { rdd =>
        rdd.foreachPartition { itr =>
            while(itr.hasNext)
            {
              var line = itr.next
              var json: Option[Any] = JSON.parseFull(line)
              var map = json.get.asInstanceOf[Map[String, Any]]

              val id = map.get("id").get.toString
              sc.cassandraTable("dunro", "sample").select("id", "value").where("id = ?",id).limit(5)
              // Do something
            }
        }
    }

ssc.start()
ssc.awaitTermination()
}

当我运行此代码时,我收到以下错误:

18/01/27 16:20:40错误JobScheduler:运行作业流作业时出错1517057440000 ms.0 org.apache.spark.SparkException:任务不能在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner . scala:340)在org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:330)at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner) .scala:156)org.apache.spark.SparkContext.clean(SparkContext.scala:2294)org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:925)at org . apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924)at ...

它说变量

sc

在线:

sc.cassandraTable(“dunro”,“sample”) . select(“id”,“value”) . where(“id =?”,id).limit(5)

不可序列化,因此无法将其序列化传递给执行者,因为sc变量只能在驱动程序内部访问 .

总而言之,我可以做什么来选择/插入 foreachRDD 中的Cassandra表 . foreachPartition 循环?