我开发了火花流(reeiver方法),它从kafka读取数据并处理数据并写入elasticsearch .
相同的代码是用java开发的(现在我们在spark scala中编写相同的代码),当我们与java性能进行比较时,spark效果不佳 .
我所观察到的是,当我们写ES时,它需要时间 .
这是我的高级代码:
val kafkaStreams: util.List[DStream[String]] = new util.ArrayList[DStream[String]]
for(i <- 0 until topic_threads){
var data = KafkaUtils.createStream(ssc,kafkaConf,topic).map(line => line._2)
kafkaStreams.add(data)
}
//下面的union可以根据spark 1.6.2文档提高性能
val unifiedStream = ssc.union(kafkaStreams)
unifiedStream.persist(StorageLevel.MEMORY_ONLY)
if(flagY){
val dataES = unifiedStream.map(rdd => processData(rdd))
dataES.foreachRDD(rdd => {
ElasticUtils.saveToEs(rdd, index_Name, index_Type)
})
在processData方法中,我只是解析我们从kafka获取红色的数据 .
谁能告诉我你的经验或建议,以改善火花蒸汽(scala)接收器的接近性能 .
由于这种低性能,批次堆积和批量调度的延迟增加..