我一直在实现spark streaming来读取Kafka主题,实体为JSON . 需要转换来提取JSON对象信息 . 之后,我需要检查Cassandra表中的一些值,然后根据JSON文件属性更新一行或在Cassandra中插入一个新行 . 所以这是简化的代码:
import ...
object SparkStreamingTest {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkStreamingTest").setMaster("local[*]").set("spark.cassandra.connection.host","172.16.133.11")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
val kafkaConf = Map(
...
)
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](Seq("topic"), kafkaConf)
)
messages.map(_.value)
.flatMap(_.split("\n"))
.foreachRDD { rdd =>
rdd.foreachPartition { itr =>
while(itr.hasNext)
{
var line = itr.next
var json: Option[Any] = JSON.parseFull(line)
var map = json.get.asInstanceOf[Map[String, Any]]
val id = map.get("id").get.toString
sc.cassandraTable("dunro", "sample").select("id", "value").where("id = ?",id).limit(5)
// Do something
}
}
}
ssc.start()
ssc.awaitTermination()
}
当我运行此代码时,我收到以下错误:
18/01/27 16:20:40错误JobScheduler:运行作业流作业时出错1517057440000 ms.0 org.apache.spark.SparkException:任务不能在org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner . scala:340)在org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $ util $ ClosureCleaner $$ clean(ClosureCleaner.scala:330)at org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner) .scala:156)org.apache.spark.SparkContext.clean(SparkContext.scala:2294)org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:925)at org . apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924)at ...
它说变量
sc
在线:
sc.cassandraTable(“dunro”,“sample”) . select(“id”,“value”) . where(“id =?”,id).limit(5)
不可序列化,因此无法将其序列化传递给执行者,因为sc变量只能在驱动程序内部访问 .
总而言之,我可以做什么来选择/插入 foreachRDD
中的Cassandra表 . foreachPartition
循环?