我有一个数据帧,我想写入Cassandra表 . 数据框由4,680,820行组成 . 数据本身来自.csv文件,大约650MB大小,并由来自datastax的spark Cassandra连接器读取 .

然后,使用Spark Cassandra连接器将数据写入cassandra . 连接器会写入记录但不是全部 . 它目前只写了17349条记录 . 我应该如何优化写入并写入所有460万条记录 . 我有8个Exeuctors,每个 Actuator 有4个核心,所以今天有28个任务可以并行运行 .

我正在使用Cassandra 3.0.13和Spark 2.1.0以及Spark Cassandra Connector:spark-cassandra-connector-assembly-2.0.1

这是我正在使用的代码:

def writeToCassandra(df: DataFrame, tableName: String) {
    if (df.take(1).length>0){
        println("Number of rows in DF are: "+ df.count);
        df.write
        .format("org.apache.spark.sql.cassandra")
        .mode("overwrite")
        .options(Map(
            "header" -> "true",
            "output.batch.grouping.key" -> "none",
            "cassandra.output.throughput_mb_per_sec" -> "1MB",
            "table" -> tableName, 
            "keyspace" -> "vardb_irc", 
            "cluster" -> "CassandraPOCluster")
            )
        .save()
    }
    else{
        println("Dataframe is empty");
    }
}

通过以下方法调用上述方法 .

def runCSVDataSourceImport(){
    // Load the CM data into with SPARkSQL
    val adErDF = spark.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .option("inferSchema", "true")
      .load("file:///opt/risk-4.20170511.csv")

   println("DF: adErDF ready to use")
   println(adErDF.count);
   val adErDFRepartition = adErDF.repartition(3)
   adErDFRepartition.explain
   writeToCassandra(adErDFRepartition, "risk_4");
}

谢谢