首页 文章

如何在2.2.0中获得Apache Spark Dataframe的Cassandra cql字符串?

提问于
浏览
1

我试图获得一个给定Dataframe的cql字符串 . 我遇到了这个function

我可以在哪里做这样的事情

TableDef.fromDataFrame(df, "test", "hello", ProtocolVersion.NEWEST_SUPPORTED).cql()

在我看来,该库使用第一列作为分区键,而不关心群集密钥,那么如何指定使用Dataframe的特定列作为PartitionKey和ParticularSet作为聚类键?

看起来我可以创建一个新的TableDef但是我必须自己完成整个映射,在某些情况下,像JavaType这样的必要函数是不可访问的 . 例如,我尝试创建一个新的ColumnDef,如下所示

new ColumnDef("col5", new PartitionKeyColumn(), ColumnType is not accessible in Java)

Objective: 从Spark DataFrame获取CQL create语句 .

Input 我的数据框可以包含任意数量的列及其各自的Spark类型 . 所以说我有一个包含100列的Spark Dataframe,其中我的col8,我的数据帧的col9对应于cassandra partitionKey列,而我的column10对应于cassandra clustering Key列

col1| col2| ...|col100

现在我想使用spark-cassandra-connector库给出一个CQL创建表语句,给出上面的信息 .

Desired Output

create table if not exists test.hello (
   col1 bigint, (whatever column1 type is from my dataframe I just picked bigint randomly)
   col2 varchar,
   col3 double,
   ...
   ...
   col100 bigint,
   primary key(col8,col9)
) WITH CLUSTERING ORDER BY (col10 DESC);

1 回答

  • 1

    因为必需的组件( PartitionKeyColumnColumnType 的实例)是Scala中的对象,所以您需要使用以下语法来访问它们的内容:

    // imports
    import com.datastax.spark.connector.cql.ColumnDef;
    import com.datastax.spark.connector.cql.PartitionKeyColumn$;
    import com.datastax.spark.connector.types.TextType$;
    
    // actual code
    ColumnDef a = new ColumnDef("col5",  
          PartitionKeyColumn$.MODULE$, TextType$.MODULE$);
    

    请参阅ColumnRolePrimitiveTypes的代码以查找对象/类名称的完整列表 .

    Update after additional requirements :代码冗长,但应该有效......

    SparkSession spark = SparkSession.builder()
                    .appName("Java Spark SQL example").getOrCreate();
    
    Set<String> partitionKeys = new TreeSet<String>() {{
                    add("col1");
                    add("col2");
            }};
    Map<String, Integer> clustereingKeys = new TreeMap<String, Integer>() {{
                    put("col8", 0);
                    put("col9", 1);
            }};
    
    Dataset<Row> df = spark.read().json("my-test-file.json");
    TableDef td = TableDef.fromDataFrame(df, "test", "hello", 
                    ProtocolVersion.NEWEST_SUPPORTED);
    
    List<ColumnDef> partKeyList = new ArrayList<ColumnDef>();
    List<ColumnDef> clusterColumnList = new ArrayList<ColumnDef>();
    List<ColumnDef> regColulmnList = new ArrayList<ColumnDef>();
    
    scala.collection.Iterator<ColumnDef> iter = td.allColumns().iterator();
    while (iter.hasNext()) {
            ColumnDef col = iter.next();
            String colName = col.columnName();
            if (partitionKeys.contains(colName)) {
                    partKeyList.add(new ColumnDef(colName, 
                                    PartitionKeyColumn$.MODULE$, col.columnType()));
            } else if (clustereingKeys.containsKey(colName)) {
                    int idx = clustereingKeys.get(colName);
                    clusterColumnList.add(new ColumnDef(colName, 
                                    new ClusteringColumn(idx), col.columnType()));
            } else {
                    regColulmnList.add(new ColumnDef(colName, 
                                    RegularColumn$.MODULE$, col.columnType()));
            }
    }
    
    TableDef newTd = new TableDef(td.keyspaceName(), td.tableName(), 
                    (scala.collection.Seq<ColumnDef>) partKeyList,
                    (scala.collection.Seq<ColumnDef>) clusterColumnList, 
                    (scala.collection.Seq<ColumnDef>) regColulmnList,
                    td.indexes(), td.isView());
    String cql = newTd.cql();
    System.out.println(cql);
    

相关问题