我使用的是spring-boot版本2.0.0.M6 . 我需要从spring-boot应用程序发出异步HTTP调用说APP1到另一个应用程序(播放框架)说APP2 . 因此,如果我需要从APP1到APP2进行20次不同的异步调用,APP2会收到20个请求,其中很少是重复请求,这意味着这些重复请求替换了几个不同的请求 . 预期:
api/v1/call/1
api/v1/call/2
api/v1/call/3
api/v1/call/4
实际:
api/v1/call/1
api/v1/call/2
api/v1/call/4
api/v1/call/4
我正在使用Spring反应式WebClient .
下面是build.gradle中的spring boot版本
buildscript {
ext {
springBootVersion = '2.0.0.M6'
//springBootVersion = '2.0.0.BUILD-SNAPSHOT'
}
repositories {
mavenCentral()
maven { url "https://repo.spring.io/snapshot" }
maven { url "https://repo.spring.io/milestone" }
maven {url "https://plugins.gradle.org/m2/"}
}
dependencies {
classpath("org.springframework.boot:spring-boot-gradle-plugin:${springBootVersion}")
classpath("se.transmode.gradle:gradle-docker:1.2")
}
}
我的WebClient init片段
private WebClient webClient = WebClient.builder()
.clientConnector(new ReactorClientHttpConnector((HttpClientOptions.Builder builder) -> builder.disablePool()))
.build();
我的POST方法
public <T> Mono<JsonNode> postClient(String url, T postData) {
return Mono.subscriberContext().flatMap(ctx -> {
String cookieString = ctx.getOrDefault(Constants.SubscriberContextConstnats.COOKIES, StringUtils.EMPTY);
URI uri = URI.create(url);
return webClient.post().uri(uri).body(BodyInserters.fromObject(postData)).header(HttpHeaders.COOKIE, cookieString)
.exchange().flatMap(clientResponse ->
{
return clientResponse.bodyToMono(JsonNode.class);
})
.onErrorMap(err -> new TurtleException(err.getMessage(), err))
.doOnSuccess(jsonData -> {
});
});
}
调用此postClient方法的代码
private void getResultByKey(PremiumRequestHandler request, String key, BrokerConfig brokerConfig) {
/* Live calls for the insurers */
LOG.info("[PREMIUM SERVICE] LIVE CALLLLL MADE FOR: " + key + " AND REQUEST ID: " + request.getRequestId());
String uri = brokerConfig.getHostUrl() + verticalResolver.determineResultUrl(request.getVertical()) + key;
LOG.info("[PREMIUM SERVICE] LIVE CALL WITH URI : " + uri + " FOR REQUEST ID: " + request.getRequestId());
Mono<PremiumResponse> premiumResponse = reactiveWebClient.postClient(uri, request.getPremiumRequest())
.map(json -> PlatformUtils.mapToClass(json, PremiumResponse.class));
premiumResponse.subscribe(resp -> {
resp.getPremiumResults().forEach(result -> {
LOG.info("Key " + result.getKey());
repository.getResultRepoRawType(request.getVertical())
.save(result).subscribe();
saveResult.subscriberContext(ctx -> {
MultiBrokerMongoDBFactory.setDatabaseNameForCurrentThread(brokerConfig.getBroker());
return ctx;
}).subscribe();
});
}, error -> {
LOG.info("[PREMIUM SERVICE] ERROR RECIEVED FOR " + key + " AND REQUEST ID" + request.getRequestId() + " > " + error.getMessage());
});
}
如果将日志放在客户端代码的终点,则无法在该点看到多个请求 .
可能是WebClient中的一个错误,其中URI在多线程环境中被交换 .
尝试改变WebClient,仍然会交换URI
请帮忙 .
Git Repo添加github.com/praveenk007/ps-demo
1 回答
加上我的一些观察:
webClient.get()
或webClient.post()
总是在每次调用URI
的调用时返回新DefaultRequestBodyUriSpec
,我认为它看起来不像URI
被交换 .和
methodInternal
方法如下所示此外,在同时发出实际请求时,会创建新的
ClientRequest
.类来源
https://github.com/spring-projects/spring-framework/blob/master/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java