首页 文章

Datastax spark cassandra连接器 - 将DF写入cassandra表

提问于
浏览
2

我们最近使用Scala,Spark和Cassandra开始了大数据项目,我是所有这些技术的新手 . 我正在尝试写入简单的任务并从cassandra表中读取 . 如果我将属性名称和列名都保存为小写或蛇形(unserscores),我能够实现这一点,但我想在我的scala代码中使用camel case . 有没有更好的方法来实现这一点,使用Scala中的camel case格式和cassandra中的snake case .

我们正在使用

scala - 2.10.5 spark - 1.6.2 datastax spark-cassandra-connector - 1.6.0 cassandra - 3.0.9.1346 datastax enterprise - 5.0.3

Cassandra 表

CREATE TABLE dev.castable (
id int PRIMARY KEY,
long_name text,
name text,
short_name text)

Scala代码

val conf = new SparkConf()
        .setAppName("TestHelper")
        .setMaster("local")
        .set("spark.cassandra.connection.host","127.0.01")
    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)

    println("writing data to cassandra")
    val df = sqlContext.createDataFrame(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
    df.write //*** this is not working
      .cassandraFormat("castable", "dev")
      .mode(SaveMode.Append)
      .save()

    println("reading data from cassandra") //*** This is working fine
    val rdd = sc.cassandraTable[MyRow]("dev", "castable")
    rdd.foreach(println)

例外

Exception in thread "main" java.util.NoSuchElementException: Columns not found in table dev.castable: longName, shortName
at com.datastax.spark.connector.SomeColumns.selectFrom(ColumnSelector.scala:38)
at com.datastax.spark.connector.writer.TableWriter$.apply(TableWriter.scala:268)
at com.datastax.spark.connector.RDDFunctions.saveToCassandra(RDDFunctions.scala:36)
at org.apache.spark.sql.cassandra.CassandraSourceRelation.insert(CassandraSourceRelation.scala:67)
at org.apache.spark.sql.cassandra.DefaultSource.createRelation(DefaultSource.scala:85)
at org.apache.spark.sql.execution.datasources.ResolvedDataSource$.apply(ResolvedDataSource.scala:222)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:148)
at com.aktana.spark.util.LocalTestDriver$.main(LocalTestDriver.scala:38)

我读到spark-cassandra-connector会自动自动执行此转换,但它对我不起作用 . datastax spark-cassandra-connector

2 回答

  • 0

    您的MyRow定义似乎与cassandra表定义不匹配 . 试试这个:

    val df = List((1, "My Long Description", "My Name", "My Short Name")).toDF("id", "long_name", "name", "short_name")
    
  • 0

    使用RDD,spark-cassandra-connector自动将camel cased属性转换为下划线列名 . 再次感谢RussS

    这是我如何将case类对象保存到cassandra表

    val writeRDD = sc.makeRDD(List(new MyRow(2,Option("long name"), "ss", Option("short name"))))
        writeRDD.saveToCassandra("dev", "castable")
    

相关问题