我想使用Spark Job将数据从一个Cassandra集群表迁移到另一个 .

我写了下面的代码 . 如果源表和目标表都在同一个主机中,它工作正常,但是当源和目标位于不同的主机上时它不起作用 . 我能够从目标表中读取,但是当我尝试将结果写入目标表时,我收到类似“无效参数”的错误,暂存失败..我已经交叉检查了参数的数量 . 所有细节似乎都是正确的 .

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.cassandra._
import org.apache.spark.sql.types.IntegerType

object Test2 {
  def main(args : Array[String]) = {
    println("Hello World")

    val conf = new SparkConf().setAppName("Data_Migration_1").setMaster("local[*]");
    val sc = new SparkContext(conf);

    val spark = SparkSession
      .builder()
      .appName("Spark SQL data Migration program")
      .config("spark.some.config.option", "some-value")
      .getOrCreate()

    val df_read1 = spark.read
                        .format("org.apache.spark.sql.cassandra")                                                                            .option("spark.cassandra.connection.host","127.0.0.132")
                        .option("spark.cassandra.connection.port","9042")
                        .option("keyspace","sparkdb")
                        .option("table","emp1")
                        .load()
    println(df_read1.show); 
    println("total Records in Table1 = "+ df_read1.count());

    val df_read2 = spark.read
                        .format("org.apache.spark.sql.cassandra")                         
                        .option("spark.cassandra.connection.host","127.0.0.132")
                        .option("spark.cassandra.connection.port","9042")
                        .option("keyspace","sparkdb2")
                        .option("table","emp3")
                        .load()
    println(df_read2.show);
    println("total Records in Table2 = "+ df_read2.count());

    println("cassandra Read Happened successfully");


    val df3= df_read1.join(df_read2, df_read1("emp_id") === df_read2("emp_id"), "leftanti");
    println(df3.show);
    println("total differenced Records in 2 Tables = "+ df3.count());


    df3.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").save("C:/spark_windows_proj/DataM_HFT/File1.csv");

    df3.write
       .format("org.apache.spark.sql.cassandra")
       .mode("append")
       .option("confirm.truncate","true")
       .option("spark.cassandra.connection.host","127.0.0.132")
       .option("spark.cassandra.connection.port","9042")
       .option("keyspace","sparkdb2")
       .option("table","emp3")
       .save()    

    println("cassandra write Happened successfully");        
  }
}

并且让我知道是否有更好的方法来实现这一目标 . 我实际上有18个列,有一些时间戳和空值 . 当我尝试插入时我得到以下错误:java.lang.IllegalArgumentException:要求失败:无效的行大小:18而不是17.在scala.Predef $ .require (predef.scala:224)com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12)at at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)位于com.datastax.spark.connector的com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106) . writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)at scala.collection.Iterator $ class.foreach(Iterator.scala:891)at com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31) at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:233)at co m.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:210)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala :112)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:111)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145) )com.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)位于com.datastax.spark的com.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210) . 在com.datastax.spark.connec.TriteWriter.write(TableWriter.scala:183)的com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra上的connector.writer.TableWriter.insert(TableWriter.scala:197) $ 1.apply(RDDFunctions.scala:36)位于org.apac的com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36) he.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)atg.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor $ TaskRunner.run( Executor.scala:345)java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624)at java.lang.Thread.run (Thread.java:748)18/08/20 15:40:20 INFO TaskSetManager:在阶段24.0中启动任务119.0(TID 575,localhost,执行程序驱动程序,分区119,ANY,8082字节)18/08/20 15: 40:20 INFO执行程序:在阶段24.0中运行任务119.0(TID 575)18/08/20 15:40:20 INFO ShuffleBlockFetcherIterator:从4个块中获取0个非空块18/08/20 15:40:20 INFO ShuffleBlockFetcherIterator:0 ms 18/08/20 15:40:20开始0远程提取WARN TaskSetManager:阶段24.0中丢失的任务118.0(TID 574,localhost, Actuator 驱动程序):java.lang.IllegalArgumentException:要求失败:行大小无效:18岁而不是17岁a.Predef $ .require(Predef.scala:224)at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter) .scala:12)com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99)at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:106)at com . 位于com.datastax.spark.connector.writer.GroupingBatchBuilder.foreach的scala.collection.Iterator $ class.foreach(Iterator.scala:891)中的datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31) GroupingBatchBuilder.scala:31)at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:233)at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:210)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:112)a t com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:111)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)at com .datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111)at atcom.datastax.spark.connector.writer.TableWriter.writeInternal(TableWriter.scala:210),位于com.datastax.spark.connector的com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197) . com.datastax.spark.connector.RDDFunctions上的writer.TableWriter.write(TableWriter.scala:183)$$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)at org.apache.spark.scheduler.Task.run(Task.scala:109) at org.apache.spark.executor.Executor $ TaskRunner.run(Executor.scala:345)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)at java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:624)在java.lang.Thread.run(Thread.java:748)

18/08/20 15:40:20 INFO ShuffleBlockFetcherIterator:从4个块中获取2个非空块18/08/20 15:40:20 INFO ShuffleBlockFetcherIterator:在0 ms开始0个远程提取18/08/20 15: 40:20 ERROR TaskSetManager:阶段24.0中的任务118失败1次;中止工作18/08/20 15:40:20 INFO TaskSchedulerImpl:取消阶段24 18/08/20 15:40:20 INFO执行者:执行者试图在阶段24.0(TID 575)中杀死任务119.0,原因:阶段已取消18/08/20 15:40:20 INFO TaskSchedulerImpl:第24阶段被取消18/08/20 15:40:20 INFO DAGScheduler:ResultStage 24(RDDFunctions.scala:36中的runJob)因2.1工作中止而失败阶段失败:阶段24.0中的任务118失败1次,最近失败:阶段24.0中失去的任务118.0(TID 574,localhost, Actuator 驱动程序):java.lang.IllegalArgumentException:要求失败:行大小无效:18而不是17 . 在scala.Predef $ .require(Predef.scala:224)com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23)at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues (sqlRowWriter.scala:12)位于com.datastax.spark.connend.Bart.Group中的com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99) tchBuilder.next(GroupingBatchBuilder.scala:106)at com.datastax.spark.connector.writer.GroupingBatchBuilder.next(GroupingBatchBuilder.scala:31)at scala.collection.Iterator $ class.foreach(Iterator.scala:891)at com .datastax.spark.connector.writer.GroupingBatchBuilder.foreach(GroupingBatchBuilder.scala:31)at com.datastax.spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:233)at com.datastax .spark.connector.writer.TableWriter $$ anonfun $ writeInternal $ 1.apply(TableWriter.scala:210)at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:112) at com.datastax.spark.connector.cql.CassandraConnector $$ anonfun $ withSessionDo $ 1.apply(CassandraConnector.scala:111)at com.datastax.spark.connector.cql.CassandraConnector.closeResourceAfterUse(CassandraConnector.scala:145)at com com.datastax.spark.connector.writer.TableWr中的.datastax.spark.connector.cql.CassandraConnector.withSessionDo(CassandraConnector.scala:111) iter.writeInternal(TableWriter.scala:210)at com.datastax.spark.connector.writer.TableWriter.insert(TableWriter.scala:197)at com.datastax.spark.connector.writer.TableWriter.write(TableWriter.scala: 183)在com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala:36)at com.datastax.spark.connector.RDDFunctions $$ anonfun $ saveToCassandra $ 1.apply(RDDFunctions.scala: 36)org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)org.apache.spark.scheduler.Task.run(Task.scala:109)at org.apache.spark.executor.Executor $ java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)中的$ TaskRunner.run(Executor.scala:345),位于java的java.util.concurrent.ThreadPoolExecutor $ Worker.run(ThreadPoolExecutor.java:624) . lang.Thread.run(Thread.java:748)