我有一个场景,我将接收由我的火花流程序处理的流数据,并且每个间隔的输出将附加到我现有的cassandra表中 .
目前我的火花流程序将生成一个数据框,我需要保存在我的cassandra表中 . 我目前面临的问题是当我使用下面的命令时,我无法将数据/行附加到我现有的cassandra表中
dff.write.format("org.apache.spark.sql.cassandra").options(Map("table" -> "xxx", "yyy" -> "retail")).save()
我已经阅读了以下链接http://rustyrazorblade.com/2015/08/migrating-from-mysql-to-cassandra-using-spark/,他将mode = "append"传递给了save方法但是它的抛出语法错误
此外,我能够从下面的链接了解我需要修复的地方https://groups.google.com/a/lists.datastax.com/forum/#!topic/spark-connector-user/rlGGWQF2wnM
如何解决这个问题需要帮助 . 我正在scala中编写我的spark流媒体作业
1 回答
我想你必须按照以下方式做到:
cassandra处理数据的方式迫使你做所谓的“upserts” - 你必须记住插入可能会覆盖已经存储的记录的主键与插入的reccord的主键相同的一些行 . Cassandra是一个“快速写入”的数据库,因此它不会在写入之前检查数据是否存在 .