作为我项目的一部分,我必须为一个非常大的Cassandra数据集创建一个SQL查询接口,因此我一直在寻找使用Spark在cassandra列族上执行SQL查询的不同方法,我已经提出了3种不同的方法

  • using Spark SQLContext with a statically defined schema
// statically defined in the application
public static class TableTuple implements Serializable {
    private int id;
    private String line;

    TableTuple (int i, String l) {
        id = i;
        line = l;
    }

    // getters and setters
    ...
}

我将定义用作:

SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SQLContext sqlContext = new SQLContext(sc);

JavaRDD<CassandraRow> rowrdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
JavaRDD<TableTuple> rdd = rowrdd.map(row -> new TableTuple(row.getInt(0), row.getString(1)));

DataFrame dataFrame = sqlContext.createDataFrame(rdd, TableTuple.class);
dataFrame.registerTempTable("lines");

DataFrame resultsFrame = sqlContext.sql("Select line from lines where id=1");

System.out.println(Arrays.asList(resultsFrame.collect()));
  • using Spark SQLContext with a dynamically defined schema
SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);
SQLContext sqlContext = new SQLContext(sc);

JavaRDD<CassandraRow> cassandraRdd = javaFunctions(sc).cassandraTable(CASSANDRA_KEYSPACE, CASSANDRA_COLUMN_FAMILY);
JavaRDD<Row> rdd = cassandraRdd.map(row -> RowFactory.create(row.getInt(0), row.getString(1)));

List<StructField> fields = new ArrayList<>();
fields.add(DataTypes.createStructField("id", DataTypes.IntegerType, true));
fields.add(DataTypes.createStructField("line", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);

DataFrame dataFrame = sqlContext.createDataFrame(rdd, schema);
dataFrame.registerTempTable("lines");

DataFrame resultDataFrame = sqlContext.sql("select line from lines where id = 1");

System.out.println(Arrays.asList(resultDataFrame.collect()));
  • using CassandraSQLContext from the spark-cassandra-connector
SparkConf conf = new SparkConf(true)
        .set("spark.cassandra.connection.host", CASSANDRA_HOST)
        .setJars(jars);

SparkContext sc = new SparkContext(HOST, APP_NAME, conf);

CassandraSQLContext sqlContext = new CassandraSQLContext(sc);
DataFrame resultsFrame = sqlContext.sql("Select line from " + CASSANDRA_KEYSPACE + "." + CASSANDRA_COLUMN_FAMILY + " where id = 1");

System.out.println(Arrays.asList(resultsFrame.collect()));

我想知道一种方法优于另一种方法的优点/缺点 . 此外,对于 CassandraSQLContext 方法,查询仅限于CQL,或者它与Spark SQL完全兼容 . 我还想对我的具体用例进行分析,我有一个cassandra列系列,有大约1760万个元组,有62列 . 对于查询这么大的数据库,哪种方法最合适?