我有一个火花流媒体工作,在客户端模式下工作得非常好 . 它从Kafka读取数据,处理它,并插入到Cassandra .

当我将部署模式更改为群集时,应用程序正在运行,但数据不会刷新到Cassandra . 我检查了所有日志,但没有错误 .

无论如何,在WEB UI中,我发现了一些非常有趣的问题:
enter image description here

看起来流中的所有批次都没有完成,它们都是活跃的!

那可能是什么问题呢?

EDITED

这是一些代码

val measurements = KafkaUtils.createDirectStream[
  Array[Byte],
  Array[Byte],
  DefaultDecoder,
  DefaultDecoder](ssc, kafkaConfig, Set("wattio"
))
  .map {
    case (k, v) => {
      val decoder = new AvroDecoder[WattioMeasure](null,
        WattioMeasure.SCHEMA$)
      decoder.fromBytes(v)
    }
  }

WattioFunctions.run(WattioFunctions.processWattioRaw(measurements))
     ((rdd: RDD[WattioTenantRaw], t: Time) => {
    rdd.cache()
    val differentTenants = rdd.map(a => a.tenant).distinct().collect()
    differentTenants.foreach(tenant => {
      val keyspace = tenant + "_readings"
      rdd.filter(a => a.tenant == tenant).map(s => s.wattioRaw).saveToCassandra(keyspace, "table")
    })
    rdd.unpersist(true)
  }
)

ssc.checkpoint("/tmp")
ssc.start()
ssc.awaitTermination()