首页 文章

使用Datastax的Spark Cassandra Connector在TableDef上设置Cassandra Clustering Order

提问于
浏览
3

Every time I try to create a new table in cassandra with a new TableDef I end up with a clustering order of ascending and I'm trying to get descending.

我正在使用Cassandra 2.1.10,Spark 1.5.1和Datastax Spark Cassandra Connector 1.5.0-M2 .

我正在创建一个新的 TableDef

val table = TableDef("so", "example", 
  Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)),
  Seq(ColumnDef("ts", ClusteringColumn(0), TimestampType)),
  Seq(ColumnDef("name", RegularColumn, TextType)))

rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name"))

我期待在Cassandra看到的是

CREATE TABLE so.example (
    parkey text,
    ts timestamp,
    name text,
    PRIMARY KEY ((parkey), ts)
) WITH CLUSTERING ORDER BY (ts DESC);

我最终得到的是

CREATE TABLE so.example (
    parkey text,
    ts timestamp,
    name text,
    PRIMARY KEY ((parkey), ts)
) WITH CLUSTERING ORDER BY (ts ASC);

如何强制它将聚类顺序设置为降序?

1 回答

  • 2

    我无法找到这样做的直接方法 . 此外,您可能还需要指定许多其他选项 . 我最终扩展 ColumnDefTableDef 并覆盖 TableDef 中的 cql 方法 . 我想出的解决方案的一个例子如下 . 如果某人有更好的方式,或者这种方式得到本地支持,我很乐意改变答案 .

    // Scala Enum
    object ClusteringOrder {
      abstract sealed class Order(val ordinal: Int) extends Ordered[Order]
        with Serializable {
        def compare(that: Order) = that.ordinal compare this.ordinal
    
        def toInt: Int = this.ordinal
      }
    
      case object Ascending extends Order(0)
      case object Descending extends Order(1)
    
      def fromInt(i: Int): Order = values.find(_.ordinal == i).get
    
      val values = Set(Ascending, Descending)
    }
    
    // extend the ColumnDef case class to add enum support
    class ColumnDefEx(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
      indexed: Boolean = false, val clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending)
      extends ColumnDef(columnName, columnRole, columnType, indexed)
    
    // Mimic the ColumnDef object
    object ColumnDefEx {
      def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
        indexed: Boolean, clusteringOrder: ClusteringOrder.Order): ColumnDef = {
        new ColumnDefEx(columnName, columnRole, columnType, indexed, clusteringOrder)
      }
    
      def apply(columnName: String, columnRole: ColumnRole, columnType: ColumnType[_],
        clusteringOrder: ClusteringOrder.Order = ClusteringOrder.Ascending): ColumnDef = {
        new ColumnDefEx(columnName, columnRole, columnType, false, clusteringOrder)
      }
    
      // copied from ColumnDef object
      def apply(column: ColumnMetadata, columnRole: ColumnRole): ColumnDef = {
        val columnType = ColumnType.fromDriverType(column.getType)
        new ColumnDefEx(column.getName, columnRole, columnType, column.getIndex != null)
      }
    }
    
    // extend the TableDef case class to override the cql method
    class TableDefEx(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef],
      clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String)
      extends TableDef(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns) {
    
      override def cql = {
        val stmt = super.cql
        val ordered = if (clusteringColumns.size > 0)
          s"$stmt\r\nWITH CLUSTERING ORDER BY (${clusteringColumnOrder(clusteringColumns)})"
        else stmt
        appendOptions(ordered, options)
      }
    
      private[this] def clusteringColumnOrder(clusteringColumns: Seq[ColumnDef]): String =
        clusteringColumns.map { col =>
          col match {
            case c: ColumnDefEx => if (c.clusteringOrder == ClusteringOrder.Descending)
              s"${c.columnName} DESC" else s"${c.columnName} ASC"
            case c: ColumnDef => s"${c.columnName} ASC"
          }
        }.toList.mkString(", ")
    
      private[this] def appendOptions(stmt: String, opts: String) =
        if (stmt.contains("WITH") && opts.startsWith("WITH")) s"$stmt\r\nAND ${opts.substring(4)}"
        else if (!stmt.contains("WITH") && opts.startsWith("AND")) s"WITH ${opts.substring(3)}"
        else s"$stmt\r\n$opts"
    }
    
    // Mimic the TableDef object but return new TableDefEx
    object TableDefEx {
      def apply(keyspaceName: String, tableName: String, partitionKey: Seq[ColumnDef],
        clusteringColumns: Seq[ColumnDef], regularColumns: Seq[ColumnDef], options: String = "") =
        new TableDefEx(keyspaceName, tableName, partitionKey, clusteringColumns, regularColumns,
          options)
    
      def fromType[T: ColumnMapper](keyspaceName: String, tableName: String): TableDef =
        implicitly[ColumnMapper[T]].newTable(keyspaceName, tableName)
    }
    

    这允许我以这种方式创建新表:

    val table = TableDefEx("so", "example", 
      Seq(ColumnDef("parkey", PartitionKeyColumn, TextType)),
      Seq(ColumnDefEx("ts", ClusteringColumn(0), TimestampType, ClusteringOrder.Descending)),
      Seq(ColumnDef("name", RegularColumn, TextType)))
    
    rdd.saveAsCassandraTableEx(table, SomeColumns("key", "time", "name"))
    

相关问题