我正在处理一个将Spark数据帧保存到cassandra的项目,并且我得到一个关于行大小无效的异常(见下文) . 我试图跟踪连接器中的代码,看起来行大小(下面的3)与列数不同(结果是1) . 我试图跟随https://github.com/datastax/spark-cassandra-connector/blob/master/doc/2_loading.md中的示例,客户还有两个字段,而不仅仅是示例中提到的id . 我试过搜索,但没有找到任何解决方案 .

我使用Spark 2.3.0和spark-cassandra-connector:2.3.0-s_2.11 .

只是一点背景 - 我尝试将数据框保存到cassandra并且它有效 . 但是,它很慢 . 所以我试图看看使用repartitionByCassandraReplica是否会让它更快 . 我已经在数据框上尝试了批量行大小,并发写入器等的各种组合,但它仍然非常慢 . 因此,在尝试将其保存到cassandra表之前,我正在考虑使用repartitionByCassandraReplica . 如果还有其他选项可以将数据帧保存到cassandra更快,请告诉我 . (如果我删除了repartitionByCassandraReplica,我可以将数据保存到cassandra . )

这是我的情景:

密钥空间测试中的Cassandra表 -

create table customer ( customer_id text primary key, order integer, value integer);

Spark shell命令:

import com.datastax.spark.connector._

import org.apache.spark.sql.cassandra._

case class Customer(customer_id:String, order_id:Int, value:Int)

val customers = Seq(Customer("1",1,1),Customer("2",2,2)).toDF("customer_id","order_id","value")

val customersRdd = customers.rdd.repartitionByCassandraReplica("test","customers")

customersRdd.saveToCassandra("test","customer")

此时我得到一个例外:

java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 1. at scala.Predef$.require(Predef.scala:224) at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:23) at com.datastax.spark.connector.writer.SqlRowWriter.readColumnValues(SqlRowWriter.scala:12) at com.datastax.spark.connector.writer.BoundStatementBuilder.bind(BoundStatementBuilder.scala:99) at com.datastax.spark.connector.rdd.partitioner.TokenGenerator.getPartitionKeyBufferFor(TokenGenerator.scala:38) at com.datastax.spark.connector.rdd.partitioner.ReplicaPartitioner.getPartition(ReplicaPartitioner.scala:70) at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:151) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53) at org.apache.spark.scheduler.Task.run(Task.scala:108) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 18/07/18 10:27:51 ERROR Executor: Exception in task 1.0 in stage 6.0 (TID 4) java.lang.IllegalArgumentException: requirement failed: Invalid row size: 3 instead of 1.

谢谢你的帮助 .