首页 文章

组合入站通道适配器和流 Launcher

提问于
浏览
1

我正在玩 Spring 天 Cloud 流反应,我正面临一个问题 .

请考虑以下代码:

@InboundChannelAdapter("list", poller = [(Poller(fixedDelay = "\${thetis.listInterval:60000}"))])
fun timerMessageSource(): Flux<Center> = config.centers.toFlux()

我的目标是产生一种通量,该通量应由以下形式消耗:

@StreamListener("list") @Output("download")
 fun processList(center: Center): Flux<Product> = ...

但这似乎不起作用 . 通道适配器正确生成通量,但它说如下:

org.springframework.messaging.converter.MessageConversionException: Could not read JSON: Unrecognized token 'FluxIterable': was expecting ('true', 'false' or 'null')

我以为我会在入站通道适配器上添加 StreamEmitter 注释,但这似乎不起作用 .

实现这种流程的正确方法是什么?

谢谢你,问候,

费尔南多

1 回答

  • 1

    异常非常明确:您生成一个 Flux 对象作为要发送到 list 通道的消息的 payload ,以发送到消息传递中间件上的目标目标 . 并且完全正确 Flux 与要序列化的JSON不兼容 .

    另一方面,我不确定什么是Kotlin并将您的代码编译为Java,但我们开箱即用(对于Java):

    @StreamEmitter
    @Output("list")
    public Flux<Center> timerMessageSource() {
         return config.centers.toFlux();
    }
    

    并且助焊剂中的每个发射项目将被序列化并作为记录或消息发送给Binder . 当然,如果您的 Center 与JSON兼容 . 为此,您需要 spring-cloud-stream-reactive 依赖项 .

    对, @InboundChannelAdapter 在这里没有帮助甚至是干扰 .

    如果您担心某些定期发射,如果应该考虑Project Reactor中的调度支持 .

相关问题