我在更新键空间中的表时遇到了scala上的spark cassandra连接器问题
这是我的一段代码
val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
" SET a= a + " + b + " WHERE x=" +
x + " AND y=" + y +
" AND z=" + x
println(query)
val KeySpace = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)
hourUniqueKeySpace.sql(query)
当我执行此代码时,我收到这样的错误
Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found
知道为什么会这样吗?我怎样才能解决这个问题?
2 回答
通过spark-cassandra-connector可以更新带有counter column的表 . 您必须使用DataFrames和DataFrameWriter方法保存模式"append"(或SaveMode .Append,如果您愿意) . 检查代码DataFrameWriter.scala .
例如,给出一个表:
代码应该如下所示:
更新后:
通过隐式转换RDD to a DataFrame:
import sqlContext.implicits._
并使用.toDF()
,可以更简单地进行DataFrame转换 .检查此玩具应用程序的完整代码:https://github.com/kyrsideris/SparkUpdateCassandra/tree/master
由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5 . 自
@since 1.4.0
以来,DataFrameWriter被指定为@Experimental
.我相信您无法通过SPARK连接器本机更新 . 见documention:
"The default behavior of the Spark Cassandra Connector is to overwrite collections when inserted into a cassandra table. To override this behavior you can specify a custom mapper with instructions on how you would like the collection to be treated."
因此,您希望实际使用现有密钥插入新记录 .