我有一个数据帧,我想写入Cassandra表 . 数据框由4,680,820行组成 . 数据本身来自.csv文件,大约650MB大小,并由来自datastax的spark Cassandra连接器读取 .
然后,使用Spark Cassandra连接器将数据写入cassandra . 连接器会写入记录但不是全部 . 它目前只写了17349条记录 . 我应该如何优化写入并写入所有460万条记录 . 我有8个Exeuctors,每个 Actuator 有4个核心,所以今天有28个任务可以并行运行 .
我正在使用Cassandra 3.0.13和Spark 2.1.0以及Spark Cassandra Connector:spark-cassandra-connector-assembly-2.0.1
这是我正在使用的代码:
def writeToCassandra(df: DataFrame, tableName: String) {
if (df.take(1).length>0){
println("Number of rows in DF are: "+ df.count);
df.write
.format("org.apache.spark.sql.cassandra")
.mode("overwrite")
.options(Map(
"header" -> "true",
"output.batch.grouping.key" -> "none",
"cassandra.output.throughput_mb_per_sec" -> "1MB",
"table" -> tableName,
"keyspace" -> "vardb_irc",
"cluster" -> "CassandraPOCluster")
)
.save()
}
else{
println("Dataframe is empty");
}
}
通过以下方法调用上述方法 .
def runCSVDataSourceImport(){
// Load the CM data into with SPARkSQL
val adErDF = spark.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("inferSchema", "true")
.load("file:///opt/risk-4.20170511.csv")
println("DF: adErDF ready to use")
println(adErDF.count);
val adErDFRepartition = adErDF.repartition(3)
adErDFRepartition.explain
writeToCassandra(adErDFRepartition, "risk_4");
}
谢谢