首页 文章

在Spring Boot Client中接收Flux

提问于
浏览
0

这是Spring 5 Web Reactive - How can we use WebClient to retrieve streamed data in a Flux?的后续问题

我尝试遵循建议如何使用Spring WebClient接收Flux但实际上遇到了netty问题 .

在服务器端,代码是一个简单的控制器,公开Mongo存储库的findAll方法:

@RequestMapping("power")
public Flux<Power> getAll() {
    return powerRepository.findAll();
}

在客户端上,消费代码与上面给出的答案类似:

@Bean
public CommandLineRunner readFromApiServer() {
    return new CommandLineRunner() {
        @Override
        public void run(String... args) throws Exception {
            WebClient webClient = WebClient.create("http://localhost:8080");
            Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                    .flatMap(response -> response.bodyToFlux(Power.class));
            flux.subscribe();
        }
    };
}

但这引发了一个例外:

2017-02-27 08:19:41.281 ERROR 99026 --- [ctor-http-nio-5] r.ipc.netty.channel.ChannelOperations:[HttpClient]处理连接时出错 . 请求关闭通道io.netty.util.IllegalReferenceCountException:refCnt:0,减少:1在io.netty.buffer.AbstractReferenceCountedByteBuf.release0(AbstractReferenceCountedByteBuf.java:101)〜[netty-all-4.1.8.Final.jar: 4.1.8.Final]

我正在使用当前的Spring Boot 2.0.0 BUILD-SNAPSHOT .

什么是这个例外告诉我的?我该怎么做对吗?

1 回答

  • 1

    所有 CommandLineRunner bean在Spring Boot应用程序启动时执行 . 如果没有守护程序线程(即当前应用程序不是Web应用程序),则应用程序将在所有运行程序执行后关闭 .

    在您的情况下,使用 flux.subscribe() 仅"Start the chain and request unbounded demand"(javadoc),因此此方法调用不会阻塞 . 我怀疑这个命令行运行器在您有机会对您的磁通做任何事情之前返回,应用程序关闭并且您的网络资源已关闭 .

    此外,您没有对HTTP请求的结果做任何事情 . 我认为使用以下代码更新代码示例应该可以解决问题:

    @Bean
    public CommandLineRunner readFromApiServer() {
        return new CommandLineRunner() {
            @Override
            public void run(String... args) throws Exception {
                WebClient webClient = WebClient.create("http://localhost:8080");
                Flux<Power> flux = webClient.get().uri("/power").accept(MediaType.APPLICATION_STREAM_JSON).exchange()
                        .flatMap(response -> response.bodyToFlux(Power.class));
                List<Power> powerList = flux.collectList().block();
                // do something with the result?
            }
        };
    }
    

相关问题