首页 文章

如何将columnfamily从一个cassandra集群复制到另一个?

提问于
浏览
1

How to copy a columnfamily from one cassandra cluster to another?

Scenario:

  • 我只有主机的IP(对于源和目标集群),端口,key_space名称和column_family名称 .

  • 我已经在目标集群中创建了元数据(只需要复制数据) .

  • Most preferentially ,我希望使用spark-cassandra连接器JAVA API在单个/多个spark作业(中间创建DataFrame,然后保存它)中完成此操作 .

  • Moderate preferentially ,使用来自datastax的cassandra-java驱动程序 .

  • Least preferentially ,使用cassandra-jdbc驱动程序和spark-cassandra连接器JAVA API .

任何帮助将不胜感激 . 谢谢 .

4 回答

  • 3

    在现有集群上拍摄快照并在目标集群上使用批量加载器,不需要Spark(尽管您可以这样做) .

    以下是程序的the docs,但我将提供您需要做的高级概述 .

    • 在现有群集上拍摄快照

    • 将快照发送( scp )到新群集上的节点

    • 创建模式的克隆(你说你已经完成了这个)

    • 使用bulk-loader将sstables从快照流式传输到新集群 .

  • 2

    经过大量的努力,我们找到了解决方案 . 这个解决方案非常简单和疯狂 . 我们可以使用spark做到这一点,让我们看看 .

    What we were doing(which didn't worked):

    // Reading from first cassandra cluster
    
    dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark.cassandra.connection.host","firstClusterIP").load();
    
    // Writing to second cassandra cluster
    
    dataframe.write.mode("saveMode").options("otherOptionsMap").option("spark.cassandra.connection.host","secondClusterIP").save();
    

    What worked fine:

    // Reading from first cassandra cluster
    
    dataframe = cassandraSQLContext.read().format("org.apache.spark.sql.cassandra").options("otherOptionsMap").option("spark_cassandra_connection_host","firstClusterIP").load();
    
    // Writing to second cassandra cluster
    
    dataframe.write.mode("saveMode").options("otherOptionsMap")option("spark_cassandra_connection_host","secondClusterIP").save();
    

    是的,没错,你只需要在spark-cassandra主机属性中更改 period(.) to underscore( _ ) 属性 . 我不知道这是不是spark-cassandra连接器中的错误 .

  • 0

    如果您正在使用spark-cassandra-connector,则默认情况下它支持连接多个群集 . 相关的代码段如下:

    import com.datastax.spark.connector._
    import com.datastax.spark.connector.cql._
    
    import org.apache.spark.SparkContext
    
    
    def twoClusterExample ( sc: SparkContext) = {
      val connectorToClusterOne = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.1"))
      val connectorToClusterTwo = CassandraConnector(sc.getConf.set("spark.cassandra.connection.host", "127.0.0.2"))
    
      val rddFromClusterOne = {
        // Sets connectorToClusterOne as default connection for everything in this code block
        implicit val c = connectorToClusterOne
        sc.cassandraTable("ks","tab")
      }
    
      {
        //Sets connectorToClusterTwo as the default connection for everything in this code block
        implicit val c = connectorToClusterTwo
        rddFromClusterOne.saveToCassandra("ks","tab")
      }
    
    }
    

    Here是相关文档和示例代码段 .

  • 3

    Java示例

    这会奏效

    private static String sourceKeyspace = null;
    private static String targetKeyspace = null;
    private static String sourceHost = null;
    private static String targetHost = null;
    private static String sourceUsername = null;
    private static String targetUsername = null;
    private static String sourcePassword = null;
    private static String targetPassword = null;
    private static String sourceColumnFamily = null;
    private static String targetColumnFamily = null;
    private static String[] sourceColumns = null;
    // Set all above values according to your requirements
    
    private static JavaSparkContext sc;
    SparkConf sparkConf;
    
    sparkConf = new SparkConf(true).setAppName("Source Cassandra to Target Cassandra job");
    sparkConf.setMaster(jobConfig.getString("spark.context-settings.master")); // Leave empty if you are running on local spark cluster
    sparkConf
            .set("spark.cassandra.connection.host", sourceHost)
            .set("spark.cassandra.input.fetch.size_in_rows", jobConfig.getString("spark.context-settings.fetchsize"))
            .set("spark.cassandra.input.split.size_in_mb", jobConfig.getString("spark.context-settings.splitsize"))
            .set("spark.cassandra.auth.username", sourceUsername)
            .set("spark.cassandra.auth.password", sourcePassword)
            .set("cassandra.username", sourceUsername)
            .set("cassandra.password", sourcePassword)
            .set("spark.cassandra.input.consistency.level", jobConfig.getString("spark.context-settings.spark.cassandra.consistency.level"))
            .set("spark.executor.memory", jobConfig.getString("spark.context-settings.spark.executor.memory"))
            .set("spark.driver.memory",jobConfig.getString("spark.context-settings.spark.driver.memory"))
            .set("spark.executor.tasks", jobConfig.getString("spark.context-settings.spark.executor.tasks"))
            .set("spark.mesos.coarse", "true")
            .set("spark.cores.max", jobConfig.getString("spark.context-settings.spark.cores.max"))
            .set("spark.scheduler.mode", "FAIR")
            .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
            sc = new JavaSparkContext(sparkConf);
    
    JavaRDD<Tuple2<String, Integer>> tupleRows = CassandraJavaUtil.javaFunctions(sc.sc()).
    cassandraTable(sourceKeyspace, sourceColumnFamily).select(sourceColumns)
    .map(row -> {
        String authorName = row.getString("author_name");
        Integer numBooks = row.getString("num_books");
        return new Tuple2<>(authorName, numBooks);
    })
    

    主要部分使用com.datastax.spark.connector.cql.CassandraConnector和writerBuilder:

    CassandraConnector targetConnection = CassandraConnector.apply(
        sparkConf.set("spark.cassandra.connection.host",targetHost)
        .set("spark.cassandra.auth.username", targetUsername)
        .set("spark.cassandra.auth.password", targetPassword)
        .set("cassandra.username", targetUsername)
        .set("cassandra.password", targetPassword)
    );
    
    CassandraJavaUtil.javaFunctions(tupleRows).writerBuilder(targetKeyspace, targetColumnFamily, mapTupleToRow(String.class, Integer.class))
    .withConnector(targetConnection)
    .saveToCassandra();
    
    sc.stop();
    

    中提琴!你完成了!

    https://datastax-oss.atlassian.net/browse/SPARKC-340

相关问题