我开发了火花流(reeiver方法),它从kafka读取数据并处理数据并写入elasticsearch .

相同的代码是用java开发的(现在我们在spark scala中编写相同的代码),当我们与java性能进行比较时,spark效果不佳 .

我所观察到的是,当我们写ES时,它需要时间 .

这是我的高级代码:

val kafkaStreams: util.List[DStream[String]] = new util.ArrayList[DStream[String]]

for(i <- 0 until topic_threads){
      var data = KafkaUtils.createStream(ssc,kafkaConf,topic).map(line => line._2)
      kafkaStreams.add(data)
    }

//下面的union可以根据spark 1.6.2文档提高性能

val unifiedStream = ssc.union(kafkaStreams)


unifiedStream.persist(StorageLevel.MEMORY_ONLY)
if(flagY){
   val dataES = unifiedStream.map(rdd => processData(rdd))
   dataES.foreachRDD(rdd => {
     ElasticUtils.saveToEs(rdd, index_Name, index_Type)
})

在processData方法中,我只是解析我们从kafka获取红色的数据 .

谁能告诉我你的经验或建议,以改善火花蒸汽(scala)接收器的接近性能 .

由于这种低性能,批次堆积和批量调度的延迟增加..