假设我们在HDFS中有一个文件,要解析,操作并存储到Cassandra(2.2.7)中 . 对于这个任务,我使用Spark(1.6.2)和spark-cassandra-connector(1.6.0) .

我有一个包含6个节点的集群 . 我正在使用其中一个作为spark-master,然后在每个其他节点上我有一个spark worker和一个cassandra节点 . 每个节点有32个内核和32GB内存,对于集群,最多可添加160个内核(5个节点* 32个内核)和150GB内存 .

Spark工作如下:

val conf = new SparkConf(true)
  .set("spark.cassandra.connection.host", "host")
val sc = new SparkContext(conf)

// START READING
val start_reading = System.currentTimeMillis()

val filePath = "hdfs://file_path"
val elementsRDD = sc.textFile(filePath)
println("# of elements: " + elementsRDD.count)
println("# of elements partitions: " + elementsRDD.partitions.length)

val end_reading = System.currentTimeMillis()
val delta_reading = (end_reading - start_reading) / 1000
println("Time spent reading: " + delta_reading / 60 + "m " + delta_reading % 60 + "s")
// END READING

// START PARSING
val start_parsing = System.currentTimeMillis()

val parsedElementsRDD = elementsRDD.flatMap(line => {
  val parser = new XMLElementParser
  parser.parseXML(line)
  parser.elements.flatMap(handleElement(_))
})
println("# of parsed elements: " + parsedElementsRDD.count)
println("# of parsed elements partitions: " + parsedEventsRDD.partitions.length)

val end_parsing = System.currentTimeMillis()
val delta_parsing = (end_parsing - start_parsing) / 1000
println("Time spent parsing: " + delta_parsing / 60 + "m " + delta_parsing % 60 + "s")
// END PARSING

// START SAVING
val start_saving = System.currentTimeMillis()

parsedElementsRDD.saveToCassandra("keyspace", "table")

val end_saving = System.currentTimeMillis()
val delta_saving = (end_saving - start_saving) / 1000
println("Time spent saving: " + delta_saving / 60 + "m " + delta_saving % 60 + "s")
// END SAVING

在阅读和解析时,我正在执行一些操作(计数),以确保在开始对保存时间进行基准测试之前评估转换 .

我按如下方式创建了我的键空间和表:

create keyspace example_keyspace with 
replication = {'class': 'SimpleStrategy', 'replication_factor': 3};

create table example_keyspace.elements (id varint, time varint, col1 text, 
col2 varint, col3 text, primary key(id, time));

鉴于不同的条目可以具有相同的元素标识符,我将id和时间用作主键 .

关于当前的吞吐量,我得到以下时间:

# of elements: 2000000
# of elements partitions: 57
Time spent reading: 1m 35s

# of parsed elements: 142790398
# of parsed elements partitions: 57
Time spent parsing: 1m 17s

Time spent saving: 27m 43s

所以基本上,它花费27m 43s = 1663s来存储~150_000_000条记录,这是~90_198条记录/秒 .

如果我们看一下Cassandra集群状态,我们会得到以下结果:

Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address         Load       Tokens       Owns (effective)  
UN  xxx.xxx.xx.xxx  4.42 GB    256          60.9%             
UN  xxx.xxx.xx.xxx  4.2 GB     256          58.0%             
UN  xxx.xxx.xx.xxx  4.54 GB    256          62.4%             
UN  xxx.xxx.xx.xxx  4.52 GB    256          62.2%             
UN  xxx.xxx.xx.xxx  4.1 GB     256          56.6%

这意味着数据正在 balancer ,我们没有瓶颈,因为所有记录都写入同一个Cassandra节点 .

乍一看,它似乎不错,但我认为它可以更好地工作 . 所以我想到了以下问题:

  • 我是否有可能不利用数据局部性并且正在通过网络传输大量数据?在我的设置中,我有5个Cassandra节点与5个Spark工作者共处一地,但是如何确保他们在可能的情况下在本地互相交谈?我怎样才能看到通过网络发送了多少数据?

  • 调整(调整批量大小,每批行数,执行程序内存,执行程序内核,驱动程序内存等)有帮助吗?我应该如何调整这些?我尝试了一些,但基本上我是盲目地尝试不同的 Value 而没有改进 .

  • 我应该尝试批量加载吗?是不是可以使用spark-cassandra-connector?如果没有,是否有任何转机?

  • 我的群集配置可能不正确吗?也许Cassandra和Spark互相干扰,其中一个人渴望资源匮乏另一个?

  • 没有使用连接器有没有更快的替代方案?

在任何情况下,任何帮助都非常感谢 .