首页 文章

无法直接从Spark RDD写入/保存数据到Ignite

提问于
浏览
1

我尝试使用jdbc编写数据帧来点燃,

Spark版本是:2.1

点燃版:2.3

JDK:1.8

斯卡拉:2.11.8

这是我的代码片段:

def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {

  val conn = DataSource.conn
  var psmt:PreparedStatement = null

  try {
    OperationIgniteUtil.deleteIgniteData(conn,targetTable)

    hiveDF.foreachPartition({
      partitionOfRecords => {
        partitionOfRecords.foreach(
          row => for ( i <- 0 until row.length ) {
            psmt = OperationIgniteUtil.getInsertStatement(conn, targetTable, hiveDF.schema)
            psmt.setObject(i+1, row.get(i))
            psmt.execute()
          }
        )
      }
    })

  }catch {
    case e: Exception =>  e.printStackTrace()
  } finally {
    conn.close
  }
}

然后我运行spark,它打印erro消息:

org.apache.spark.SparkException:org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:298)org.apache.spark.util.ClosureCleaner $ .org $ apache $ spark $的任务不可序列化util $ ClosureCleaner $$ clean(ClosureCleaner.scala:288)org.apache.spark.util.ClosureCleaner $ .clean(ClosureCleaner.scala:108)at org.apache.spark.SparkContext.clean(SparkContext.scala:2094) at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:924)at org.apache.spark.rdd.RDD $$ anonfun $ foreachPartition $ 1.apply(RDD.scala:923) at org.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:151)atg.apache.spark.rdd.RDDOperationScope $ .withScope(RDDOperationScope.scala:112)at org.apache.spark.rdd.RDD .withScope(RDD.scala:362)org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)at org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply $ mcV $ sp (Dataset.scala:2305)org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305)在org.apache.spark.sql.Dataset $$ anonfun $ foreachPartition $ 1.apply(Dataset.scala:2305)org.apache.spark.sql.execution.SQLExecution $ .withNewExecutionId(SQLExecution.scala:57)at org . 位于org.apache.spark.sql.Dataset.foreachPartition(Dataset.scala:2304)的apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2765)位于com.pingan.pilot.ignite.common.OperationIgniteUtil $ .WriteToIgnite (OperationIgniteUtil.scala:72)at com.pingan.pilot.ignite.etl.HdfsToIgnite $ .main(HdfsToIgnite.scala:36)at com.pingan.pilot.ignite.etl.HdfsToIgnite.main(HdfsToIgnite.scala)at sun at.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke (Method.java:498)org.apache.spark.deploy.SparkSubmit $ .org $ apache $ spark $ deploy $ SparkSubmit $$ runMain(SparkSubmit.scala:738)at org.apache.spark.deploy.SparkSubmit $ . doRunMain位于org.apache.spark.deploy.SparkSubmit $ .submit(SparkSubmit.scala:212)的$ 1(SparkSubmit.scala:187)org.apache.spark.deploy.SparkSubmit $ .main(SparkSubmit.scala:126)at org .apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)引起:java.io.NotSerializableException:org.apache.ignite.internal.jdbc2.JdbcConnection序列化堆栈: - 对象不可序列化(类:org.apache.ignite) .internal.jdbc2.JdbcConnection,value:org.apache.ignite.internal.jdbc2.JdbcConnection@7ebc2975) - field(class:com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1,name:conn $ 1 ,类型:interface java.sql.Connection) - 对象(类com.pingan.pilot.ignite.common.OperationIgniteUtil $$ anonfun $ WriteToIgnite $ 1,)org.apache.spark.serializer.SerializationDebugger $ .improveException(SerializationDebugger.scala) :40)org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerialize) r.scala:100)at org.apache.spark.util.ClosureCleaner $ .ensureSerializable(ClosureCleaner.scala:295)... 27更多

任何人都知道我要解决它吗?谢谢!

2 回答

  • 2

    您必须扩展Serializable接口 .

    object Test extends Serializable { 
      def WriteToIgnite(hiveDF:DataFrame,targetTable:String):Unit = {
       ???
      }
    }
    

    我希望它能解决你的问题 .

  • 0

    这里的问题是您无法将连接序列化为Ignite DataSource.conn . 您提供给 forEachPartition 的闭包包含连接作为其范围的一部分,这就是Spark无法序列化它的原因 .

    幸运的是,Ignite提供了RDD的自定义实现,允许您为其保存值 . 您需要首先创建 IgniteContext ,然后检索Ignite的共享RDD,它提供对Ignite的分布式访问以保存您的RDD的 Row

    val igniteContext = new IgniteContext(sparkContext, () => new IgniteConfiguration())
    ...
    
    // Retrieve Ignite's shared RDD
    val igniteRdd = igniteContext.fromCache("partitioned")
    igniteRDD.saveValues(hiveDF.toRDD)
    

    更多信息可从Apache Ignite documentation访问 .

相关问题