首页 文章

java.lang.UnsupportedOperationException:'不允许写入非空的Cassandra表

提问于
浏览
5

我有一个场景,我将接收由我的火花流程序处理的流数据,并且每个间隔的输出将附加到我现有的cassandra表中 .

目前我的火花流程序将生成一个数据框,我需要保存在我的cassandra表中 . 我目前面临的问题是当我使用下面的命令时,我无法将数据/行附加到我现有的cassandra表中

dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save()

我已经阅读了以下链接http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/,他将mode = "append"传递给了save方法但是它的抛出语法错误

此外,我能够从下面的链接了解我需要修复的地方https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM

如何解决这个问题需要帮助 . 我正在scala中编写我的spark流媒体作业

1 回答

  • 8

    我想你必须按照以下方式做到:

    dff.write.format("org.apache.spark.sql.cassandra").mode(SaveMode.Append).options(Map("table" -> "xxx", "yyy" -> "retail")).save()
    

    cassandra处理数据的方式迫使你做所谓的“upserts” - 你必须记住插入可能会覆盖已经存储的记录的主键与插入的reccord的主键相同的一些行 . Cassandra是一个“快速写入”的数据库,因此它不会在写入之前检查数据是否存在 .

相关问题