首页 文章

如何在Spark中以小块的形式迭代大型Cassandra表

提问于
浏览
3

在我的测试环境中,我有1个Cassandra节点和3个Spark节点 . 我想迭代一个大约有200k行的大表,每个大约需要20-50KB .

CREATE TABLE foo (
  uid timeuuid,
  events blob,
  PRIMARY KEY ((uid))
)

这是在spark集群中执行的scala代码

val rdd = sc.cassandraTable("test", "foo")

// This pulls records in memory, taking ~6.3GB
var count = rdd.select("events").count()  

// Fails nearly immediately with 
// NoHostAvailableException: All host(s) tried for query failed [...]
var events = rdd.select("events").collect()

Cassandra 2.0.9,Spark:1.2.1,Spark-cassandra-connector-1.2.0-alpha2

我试图只运行 collect ,没有 count - 在这种情况下它只是用 NoHostAvailableException 快速失败 .

Question: 迭代大表读取和一次处理小批量行的正确方法是什么?

1 回答

  • 6

    Cassandra Spark Connector中有2个设置来调整块大小(将它们放在SparkConf对象中):

    • spark.cassandra.input.split.size:每个Spark分区的行数(默认为100000)

    • spark.cassandra.input.page.row.size:每个获取页面的行数(即网络往返)(默认为1000)

    此外,您不应在示例中使用 collect 操作,因为它将获取驱动程序应用程序内存中的所有行,并可能引发内存不足异常 . 只有在确定它会产生少量行时才能使用 collect 动作 . count 动作不同,它只产生一个整数 . 所以我建议你像你一样从Cassandra加载你的数据,处理它,然后存储结果(用Cassandra,HDFS,等等) .

相关问题