首页 文章

使用spark cassandra连接器更新Cassandra表

提问于
浏览
4

我在更新键空间中的表时遇到了scala上的spark cassandra连接器问题

这是我的一段代码

val query = "UPDATE " + COLUMN_FAMILY_UNIQUE_TRAFFIC + DATA_SET_DEVICE +
                        " SET a= a + " + b + " WHERE x=" +
                        x + " AND y=" + y +
                        " AND z=" + x

println(query)

val KeySpace    = new CassandraSQLContext(sparkContext)
KeySpace.setKeyspace(KEYSPACE)

hourUniqueKeySpace.sql(query)

当我执行此代码时,我收到这样的错误

Exception in thread "main" java.lang.RuntimeException: [1.1] failure: ``insert'' expected but identifier UPDATE found

知道为什么会这样吗?我怎样才能解决这个问题?

2 回答

  • 2

    通过spark-cassandra-connector可以更新带有counter column的表 . 您必须使用DataFrames和DataFrameWriter方法保存模式"append"(或SaveMode .Append,如果您愿意) . 检查代码DataFrameWriter.scala .

    例如,给出一个表:

    cqlsh:test> SELECT * FROM name_counter ;
    
     name    | surname | count
    ---------+---------+-------
        John |   Smith |   100
       Zhang |     Wei |  1000
     Angelos |   Papas |    10
    

    代码应该如下所示:

    val updateRdd = sc.parallelize(Seq(Row("John",    "Smith", 1L),
                                       Row("Zhang",   "Wei",   2L),
                                       Row("Angelos", "Papas", 3L)))
    
    val tblStruct = new StructType(
        Array(StructField("name",    StringType, nullable = false),
              StructField("surname", StringType, nullable = false),
              StructField("count",   LongType,   nullable = false)))
    
    val updateDf  = sqlContext.createDataFrame(updateRdd, tblStruct)
    
    updateDf.write.format("org.apache.spark.sql.cassandra")
    .options(Map("keyspace" -> "test", "table" -> "name_counter"))
    .mode("append")
    .save()
    

    更新后:

    name    | surname | count
    ---------+---------+-------
        John |   Smith |   101
       Zhang |     Wei |  1002
     Angelos |   Papas |    13
    

    通过隐式转换RDD to a DataFrameimport sqlContext.implicits._ 并使用 .toDF() ,可以更简单地进行DataFrame转换 .

    检查此玩具应用程序的完整代码:https://github.com/kyrsideris/SparkUpdateCassandra/tree/master

    由于版本在这里非常重要,以上内容适用于Scala 2.11.7,Spark 1.5.1,spark-cassandra-connector 1.5.0-RC1-s_2.11,Cassandra 3.0.5 . 自 @since 1.4.0 以来,DataFrameWriter被指定为 @Experimental .

  • 5

    我相信您无法通过SPARK连接器本机更新 . 见documention

    "The default behavior of the Spark Cassandra Connector is to overwrite collections when inserted into a cassandra table. To override this behavior you can specify a custom mapper with instructions on how you would like the collection to be treated."

    因此,您希望实际使用现有密钥插入新记录 .

相关问题