我正在开发一个Spring Webflux项目,并且遇到了一个试图在计划任务中发布和使用Flux的问题 .
@Scheduled(fixedRate = 20*1000)
fun updateNews() {
try {
logger.info("Automatic Update at: ${LocalDateTime.now()}")
articleRepository.saveAll(
sourceRepository.findAll().publishOn(Schedulers.parallel())
.map { source -> source.generate() }
.flatMap { it?.read() ?: Flux.empty() })
.timeout(Duration.ofSeconds(20)
).subscribeOn(Schedulers.parallel())
} catch(e: Throwable) {
logger.log(Level.SEVERE, "Error in Scheduler", e)
}
}
我配置的调度程序:
ConcurrentTaskScheduler(Executors.newScheduledThreadPool(3))
除非我故意阻止,否则此任务永远不会完成:
.then().block()
我原本不打扰直接引用发布/订阅调度程序,我已经尝试了所有似乎合理无效的选项 .
我的日志事件发生了,但似乎当调度程序执行此任务的线程死亡时,通量也是垃圾;即使我们指定了publishOn和subscribeOn行为,它们应该在自己的线程池中?
我想让这个行动完全被动,任何建议都会受到赞赏 .
1 回答
@Scheduled
未与Flux
集成,因此如果您返回Flux
,它将不知道如何处理 . 再加上Reactor(以及一般的Reactive Streams)这个事实,在你subscribe()
之前一切都没有发生,你可以开始看到出了什么问题 .block()
实际上是subscribe()
的一种形式,这就是为什么它一旦你添加到代码就可以工作的原因 . 它实际上可能是最好的选择,因为你需要将一段反应代码(从ReactiveRepository
)桥接到命令性阻塞世界(你的@Scheduled fun
) .