我已经在接收器模型中使用Kafka开发了火花流(1.6.2),并且批量大小为15秒 .

第一批是很多事件和处理记录非常慢 . 突然作业失败,它又重新开始 . 请参阅下面的屏幕截图 .

它正在缓慢地处理记录,但不能按预期完成所有这些批次,并且不希望看到这个队列堆积起来 .

enter image description here

我们如何将此输入大小控制在大约15到20k事件?我尝试启用spark.streaming.backpressure.enabled,但没有看到任何改进 .

我还在数据接收中实现了并行度级别,但仍然没有看到输入大小的任何变化 .

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

我使用6个 Actuator 和20个核心 .

我的代码概述:

我正在从Kafka读取日志并处理它们并在每15秒批处理间隔存储在elasticsearch中 .

您能告诉我如何控制输入尺寸并改善工作性能,或者我们如何确保批次不会起作用 .