首页 文章

使用spark将巨大的cassandra表迁移到另一个集群

提问于
浏览
0

我想将旧的Cassandra集群迁移到新集群 .

Requirements:-

我有一个10个节点的cassandra集群,我要迁移的表是~100GB . 我正在使用spark来迁移数据 . 我的火花星团有10个节点,每个节点有大约16GB的内存 . 在表中我们有一些垃圾数据,我不想迁移到新表 . 例如: - 假设我不想传输具有cid = 1234的行 . 那么,使用spark工作迁移它的最佳方法是什么?我不能直接在cassandraRdd上放置过滤,因为cid不是分区键中包含的唯一列 .

Cassandra Table:-

test_table (
    cid text,
    uid text,
    key text,
    value map<text, timestamp>,
    PRIMARY KEY ((cid, uid), key)
)

Sample Data:-

cid   | uid                | key       | value
------+--------------------+-----------+-------------------------------------------------------------------------
 1234 | 899800070709709707 | testkey1  | {'8888': '2017-10-22 03:26:09+0000'}
 6543 | 097079707970709770 | testkey2  | {'9999': '2017-10-20 11:08:45+0000', '1111': '2017-10-20 15:31:46+0000'}

我想的是下面的东西 . 但我想这不是最有效的方法 .

val filteredRdd = rdd.filter { row => row.getString("cid") != "1234" }
filteredRdd.saveToCassandra(KEYSPACE_NAME,NEW_TABLE_NAME)

这里最好的方法是什么?

1 回答

  • 1

    那种方法非常好 . 您可能希望在DataFrame中编写它以利用行编码,但这可能只会带来一些好处 . 这项行动的关键瓶颈是Cassandra的写作和阅读 .
    DF示例

    spark
      .read
      .format("org.apache.spark.sql.cassandra")
      .option("keyspace", ks)
      .option("table", table)
      .load
      .filter( 'cid !== "1234" )
      .write
      .format("org.apache.spark.sql.cassandra")
      .option("keyspace", ks2)
      .option("table", table2)
      .save
    

相关问题