我写了这段代码来分拆大量的WebClients(受 reactor.ipc.netty.workerCount
限制),立即启动Mono,等待所有Monos完成:
List<Mono<List<MetricDataModel>>> monos = new ArrayList<>(metricConfigs.size());
for (MetricConfig metricConfig : metricConfigs) {
try {
monos.add(extractMetrics.queryMetricData(metricConfig)
.doOnSuccess(result -> {
metricDataList.addAll(result);
})
.cache());
} catch (Exception e) {
}
}
Mono.when(monos)
.doFinally(onFinally -> {
Map<String, Date> latestMap;
try {
latestMap = extractInsights.queryInsights();
Transform transform = new Transform(copierConfig.getEventType());
ArrayList<Event> eventList = transform.toEvents(latestMap, metricDataList);
} catch (Exception e) {
log.error("copy: mono: when: {}", e.getMessage(), e);
}
})
.block();
它“有效”,即结果如预期 .
两个问题:
-
这是对的吗?
cache()
会导致when
等待所有Monos完成吗? -
效率高吗?有没有办法让这更快?
谢谢 .
1 回答
你应该尽可能地尝试:
使用Reactor运算符并组成单个反应链
避免将
doOn*
运算符用于副作用以外的其他操作(如日志记录)避免共享状态
你的代码可能看起来更像
此外,
cache()
运算符不会等待流的完成(实际上是then()
的作业) .