我尝试使用jdbc编写数据帧来点燃,
Spark版本是:2.1
点燃版:2.3
JDK:1.8
斯卡拉:2.11.8
这是我的代码片段:
def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
val conn = DataSource.conn
var psmt:PreparedStatement = null
try {
OperationIgniteUtil.deleteIgniteData(conn,targetTable)
hiveDF.foreachPartition({
partitionOfRecords => {
partitionOfRecords.foreach(
row => for ( i <- 0 until row.length ) {
psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
psmt.setObject(i+1, row.get(i))
psmt.execute()
}
)
}
})
}catch {
case e: Exception => e.printStackTrace()
} finally {
conn.close
}
}
然后我运行spark,它打印erro消息:
org.apache.spark.SparkException:org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:298)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $的任务不可序列化util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288)org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924)at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)atg.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD .withScope(RDD.scala:362)org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)at org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply $ mcV $ sp (Dataset.scala:2305)org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305)在org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305)org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57)at org . 位于org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304)的apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)位于com.pingan.pilot.ignite.common.OperationIgniteUtil $ .WriteToIgnite (OperationIgniteUtil.scala:72)at com.pingan.pilot.ignite.etl.HdfsToIgnite $ .main(HdfsToIgnite.scala:36)at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala)at sun at.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke (Method.java:498)org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:738)at org.apache.spark.deploy.SparkSubmit $ . doRunMain位于org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212)的$ 1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126)at org .apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.io.NotSerializableException:org.apache.ignite.internal.jdbc2.JdbcConnection序列化堆栈: - 对象不可序列化(类:org.apache.ignite) .internal.jdbc2.JdbcConnection,value:org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field(class:com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1,name:conn $ 1 ,类型:interface java.sql.Connection) - 对象(类com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1,)org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala) :40)org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerialize) r.scala:100)at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295)... 27更多
任何人都知道我要解决它吗?谢谢!
2 回答
您必须扩展Serializable接口 .
我希望它能解决你的问题 .
这里的问题是您无法将连接序列化为Ignite
DataSource.conn
. 您提供给forEachPartition
的闭包包含连接作为其范围的一部分,这就是Spark无法序列化它的原因 .幸运的是,Ignite提供了RDD的自定义实现,允许您为其保存值 . 您需要首先创建
IgniteContext
,然后检索Ignite的共享RDD,它提供对Ignite的分布式访问以保存您的RDD的Row
:更多信息可从Apache Ignite documentation访问 .