首页 文章

Spark DataFrame和Cassandra

提问于
浏览
2

我们一直在使用Spark RDD API(Spark 2.0)来处理在Cassandra中建模的数据 . 请注意,数据是在Cassandra中建模的,以便进行有效的读写操作 .

但是现在还有Spark SQL API,Spark DataFrame API也是一种备用数据访问方法 - http://spark.apache.org/docs/latest/sql-programming-guide.html

使用Spark RDD,我们使用Cache使用Datastax Cassandra驱动程序API访问Cassandra DB - http://docs.datastax.com/en/developer/java-driver/2.0/,类似于

val resultSets = new util.ArrayList[Row]()
val resultSet = CassandraConnector(SparkReader.conf).withSessionDo[ResultSet] { session =>
     val sel_stmt = QueryBuilder.select("yyy", "zz", "xxxx")
                .from("geokpi_keyspace", table_name)
                .where(QueryBuilder.eq("bin", bin))
                .and(QueryBuilder.eq("year", year))
                .and(QueryBuilder.eq("month", month))
                .and(QueryBuilder.eq("day", day))
                .and(QueryBuilder.eq("cell", cell))

    session.execute(sel_stmt)

    }
resultSets.addAll(resultSet.all())
})
resultSets.asScala.toList --> RDD[Row]

由于我们几乎直接使用CQL,因此不允许您执行Cassandra不支持的事情,如Cassandra设计不支持它 . 但是,使用Spark SQL或Spark DataFrame API访问Cassandra DB的另一种方法是为您提供SQL类型抽象 . 对于底层的Relational DB,这将是一件好事 .

但是使用这种抽象,比如JOIN来查询存储在像Cassandra这样的NoSQL数据库中的数据似乎是一个错误的抽象 . 在Spark中使用这个抽象,而不知道任何关于数据模型(分区键,聚类键等),这是对于高效的数据读取和写入非常重要,它是否会导致生成无效的代码以及从底层Cassandra节点无效/慢速的数据检索?

1 回答

  • 0

    我认为你在使用Spark SQL时我们忽略数据模型的假设是不正确的,它实践我们在非常严格的 Contract 下工作,其中数据源默认只处理基本的投影和选择,并且重处理由Spark集群 .

    同时,数据源开发人员可以在设计给定连接器时自由地包含任何类型的域或系统特定知识 . JDBC数据源就是一个很好的例子,您可以查看我对How to partition Spark RDD when importing Postgres using JDBC?的回答,看看如何使用它来执行某些非标准操作 .

    虽然Cassandra Connector似乎在这里略有限制(请原谅我,如果我错了,我还没有广泛使用它)它的RDD组件提供了广泛的Cassandra感知操作,可用于执行服务器端操作和优化整体工作流程 .

    无论如何,Spark都试图强制外部系统执行那里不支持的操作 .

    它不会导致效率低下的生成代码和无效/慢速数据检索

    我们在这里要问的根本问题是它为什么重要 . 仅仅通过使用给定的分析工作来源的事实,我们隐含地接受这样一个事实,即我们可能会以某种方式对系统施加压力,使其不适合日常操作使用 .

    同时,如果我们使用的系统不支持我们的数据处理管道中所需的某些操作,我们应该接受执行这些操作的成本可能远高于优化系统 . 虽然低效的处理需要花钱,但在选择技术堆栈和设计基础设施时应该考虑这个问题 .

    最后,如果某些操作对性能产生不可接受的影响(是的,连接很昂贵),它应该反映在数据建模中 .

    因为我们几乎直接使用CQL,所以它不允许你做像Cassandra那样不支持的事情,因为Cassandra设计不支持它

    正如已经解释过的那样,Spark SQL也没有 . 直接获取数据并稍后执行连接不会更改执行模型中的任何内容 .

    忽略此特定示例中没有任何内容无法由 DataFrame API处理,并且可以使用cassandraTable执行更复杂的检索 .

相关问题