首页 文章

等待运行Reactor Mono实例完成

提问于
浏览
0

我写了这段代码来分拆大量的WebClients(受 reactor.ipc.netty.workerCount 限制),立即启动Mono,等待所有Monos完成:

List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
   for (MetricConfig metricConfig : metricConfigs) {
        try {
            monos.add(extractMetrics.queryMetricData(metricConfig)
                  .doOnSuccess(result -> {
                      metricDataList.addAll(result);
                  })
                  .cache());
        } catch (Exception e) {
        }
    }

    Mono.when(monos)
          .doFinally(onFinally -> {
              Map<String, Date> latestMap;
              try {
                  latestMap = extractInsights.queryInsights();
                  Transform transform = new Transform(copierConfig.getEventType());
                  ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
              } catch (Exception e) {
                  log.error("copy: mono: when: {}", e.getMessage(), e);
              }
          })
          .block();

它“有效”,即结果如预期 .

两个问题:

  • 这是对的吗? cache() 会导致 when 等待所有Monos完成吗?

  • 效率高吗?有没有办法让这更快?

谢谢 .

1 回答

  • 0

    你应该尽可能地尝试:

    • 使用Reactor运算符并组成单个反应链

    • 避免将 doOn* 运算符用于副作用以外的其他操作(如日志记录)

    • 避免共享状态

    你的代码可能看起来更像

    List<MetricConfig> metricConfigs = //...
    Mono<List<MetricDataModel>> data = Flux.fromIterable(metricConfigs)
        .flatMap(config -> extractMetrics.queryMetricData(config))
        .collectList();
    

    此外, cache() 运算符不会等待流的完成(实际上是 then() 的作业) .

相关问题