首页 文章

如何在Spring Webflux / Reactor Netty Web应用程序中执行阻塞调用

提问于
浏览
4

在我的用例中,我有一个带有Reactor Netty的Spring Webflux微服务,我有以下依赖项:

  • org.springframework.boot.spring-boot-starter-webflux (2.0.1.RELEASE)

  • org.springframework.boot.spring-boot-starter-data-mongodb-reactive (2.0.1.RELEASE)

  • org.projectreactor.reactor-spring (1.0.1.RELEASE)

对于一个非常具体的案例,我需要从我的Mongo数据库中检索一些信息,并将其处理成与我的被动 WebClient 一起发送的查询参数 . 由于 WebClientUriComponentsBuilder 接受发布者(Mono / Flux),我使用 #block() 调用来接收结果 .

由于 reactor-core (版本0.7.6.RELEASE)已包含在最新的 spring-boot-dependencies (版本2.0.1.RELEASE)中,因此不再可以使用: block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx ,请参阅 - > https://github.com/reactor/reactor-netty/issues/312

我的代码片段:

public Mono<FooBar> getFooBar(Foo foo) {
    MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
    parameters.add("size", foo.getSize());
    parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
        .map(Bar::toString)
        .collectList()
        .block());

    String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
        .port(8081)
        .path("/foo-bar")
        .queryParams(parameters)
        .build()
        .toString();

    return webClient.get()
        .uri(url)
        .retrieve()
        .bodyToMono(FooBar.class);
}

这适用于 spring-boot 版本2.0.0.RELEASE,但自从升级到版本2.0.1.RELEASE并因此从 reactor-core 升级到版本0.7.6.RELEASE后不再允许 .

我看到的唯一真正的解决方案是包括一个块(非反应性)存储库/ mongo客户端,但我不确定是否鼓励这样做 . 有什么建议?

1 回答

  • 2

    WebClient 不接受其请求URL的 Publisher 类型,但没有任何内容阻止您执行以下操作:

    public Mono<FooBar> getFooBar(Foo foo) {
    
        Mono<List<String>> bars = barReactiveCrudRepository
            .findAllByIdentifierIn(foo.getBarIdentifiers())
            .map(Bar::toString)
            .collectList();
    
        Mono<FooBar> foobar = bars.flatMap(b -> {
    
            MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
            parameters.add("size", foo.getSize());
            parameters.addAll("bars", b);
    
            String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
                .port(8081)
                .path("/foo-bar")
                .queryParams(parameters)
                .build()
                .toString();
    
            return webClient.get()
                .uri(url)
                .retrieve()
                .bodyToMono(FooBar.class);
        });
        return foobar;         
    }
    

    如果有的话,这个新的reactor-core检查可以防止你在WebFlux处理程序中使用这个阻塞调用来破坏你的整个应用程序 .

相关问题