首页 文章

WebClient停止从Flux读取时的异常

提问于
浏览
1

我创建了一个返回无限Flux的服务器和一个异步读取响应对象的客户端 . 我希望客户取消订阅Flux并停止处理它 .

服务器的控制器:

@GetMapping(path = "/infinite", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<String> getStreamOfLongs() {
    return Flux.generate(sink -> sink.next("x"));
}

客户端:

WebClient client = WebClient.create("http://localhost:8080");
    Flux<String> flux = client.get()
            .uri("/infinite")
            .accept(TEXT_EVENT_STREAM)
            .retrieve()
            .bodyToFlux(String.class);
    Disposable disposable = flux.subscribe(consumer);
    Executors.newSingleThreadScheduledExecutor().schedule(() -> disposable.dispose(), 5, TimeUnit.SECONDS);

这是取消订阅流的正确方法吗?
当客户端"wants"停止阅读更多数据时需要做什么?

当客户端取消订阅(使用disposable.dispose())时,服务器端会抛出2个异常(IOException和UnsupportedOperationException):

java.io.IOException:sun.nio.ch.FileDispatcherImpl.writev0(本机方法)〜[na:1.8.0_131] sun.nio.ch.SocketDispatcher.writev(SocketDispatcher.java:51)〜[ na:1.8.0_131] at sun.nio.ch.IOUtil.write(IOUtil.java:148)〜[na:1.8.0_131] at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:504)~ [ na:1.8.0_131]在io的io.netty.channel.socket.nio.NioSocketChannel.doWrite(NioSocketChannel.java:403)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] . netty.channel.AbstractChannel $ AbstractUnsafe.flush0(AbstractChannel.java:934)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.nio.AbstractNioChannel $ AbstractNioUnsafe.flush0 (AbstractNioChannel.java:362)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannel $ AbstractUnsafe.flush(AbstractChannel.java:901)~ [netty- transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.DefaultChannelPipeline $ HeadContext.flush(DefaultChannelPipeline.java:1321)~ [netty-transport-4.1.16.Final.j ar:4.1.16.Final]在io.netty的io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext.java:776)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] . channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)~ [netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)~ [netty-transport-4.1.16.Final.jar:4.1.16.Final]在io.netty.channel.CombinedChannelDuplexHandler $ DelegatingChannelHandlerContext.flush(combinedChannelDuplexHandler.java:533)〜[netty-transport-4.1.16.Final . jar:4.1.16.Final]在io.netty的io.netty.channel.ChannelOutboundHandlerAdapter.flush(ChannelOutboundHandlerAdapter.java:115)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] . channel.CombinedChannelDuplexHandler.flush(combinedChannelDuplexHandler.java:358)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractC hannelHandlerContext.java:776)~ [netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)~ [netty-transport-4.1 .16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)~ [netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.ChannelDuplexHandler.flush(ChannelDuplexHandler.java:117)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext . java:776)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush(AbstractChannelHandlerContext.java:768)~ [netty-transport-4.1.16 .Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.flush(AbstractChannelHandlerContext.java:749)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at reactor .ipc.netty.channel.ChannelOperationsHan dler $ PublisherSender.onComplete(ChannelOperationsHandler.java:505)〜[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] at reactor.core.publisher.FluxMap $ MapSubscriber.onComplete(FluxMap.java:130 )〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxConcatArray $ ConcatArraySubscriber.onComplete(FluxConcatArray.java:184)~ [reactor-core-3.1.1 . RELEASE.jar:3.1.1.RELEASE]在reactor.core.publisher.FluxConcatArray.subscribe(FluxConcatArray.java:80)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor . core.publisher.FluxMap.subscribe(FluxMap.java:62)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Flux.subscribe(Flux.java:6516 )〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.ipc.netty.channel.ChannelOperationsHandler.drain(ChannelOperationsHandler.java:433)〜[reactor-netty-0.7.1 . RELEASE.jar:0.7.1.RELEASE] atreactor.ipc.netty.channel.ChannelOperationsHandler.flush(ChannelOperationsHandler.java:179)〜[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] at io.netty.channel.AbstractChannelHandlerContext.invokeFlush0(AbstractChannelHandlerContext .java:776)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:802)~ [netty-transport-4.1 . 16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:814)~ [netty-transport-4.1.16.Final.jar:4.1.16.Final] at at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:794)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java :831)〜[netty-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1041)~ [nett y-transport-4.1.16.Final.jar:4.1.16.Final] at io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:300)〜[netty-transport-4.1.16.Final.jar:4.1在reactor.ipc的reactor.ipc.netty.NettyOutbound.lambda $ sendObject $ 6(NettyOutbound.java:298)〜[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE]的.16.Final] . netty.FutureMono $ DeferredFutureMono.subscribe(FutureMono.java:106)〜[reactor-netty-0.7.1.RELEASE.jar:0.7.1.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono.java:2913 )〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.ipc.netty.NettyOutbound.subscribe(NettyOutbound.java:356)〜[reactor-netty-0.7.1.RELEASE . jar:0.7.1.RELEASE] at reactor.core.publisher.FluxConcatMap $ ConcatMapDelayed.drain(FluxConcatMap.java:744)[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core .publisher.FluxConcatMap $ ConcatMapDelayed.onNext(FluxConcatMap.java:581)[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxMap $ MapSubscriber.onNext(FluxMap.java :108)〜[reacto r-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]在org.springframework.http.server.reactive.ChannelSendOperator $ WriteBarrier.onNext(ChannelSendOperator.java:150)~ [spring-web-5.0.1 .RELEASE.jar:5.0.1.RELEASE] at reactor.core.publisher.FluxMapFuseable $ MapFuseableSubscriber.onNext(FluxMapFuseable.java:115)~ [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxGenerate $ GenerateSubscription.next(FluxGenerate.java:164)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at io.github.msayag.webflux.MyController . lambda $ getStreamOfLongs $ 0(MyController.java:44)〜[classes /:na] ... at io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:858)~ [netty-common-4.1.16 .Final.jar:4.1.16.Final]在java.lang.Thread.run(Thread.java:748)〜[na:1.8.0_131]

其次是

2017-11-24 01:04:09.476 ERROR 83663 --- [ctor-http-nio-2] oswsadapter.HttpWebHandlerAdapter:无法处理java.util.Collections $ UnmodifiableMap中的请求java.lang.UnsupportedOperationException:null . 把(Collections.java:1457)〜[na:1.8.0_131]放在org.springframework.http.HttpHeaders.set(HttpHeaders.java:1439)〜[spring-web-5.0.1.RELEASE.jar:5.0.1 .RELEASE] org.springframework.http.HttpHeaders.setContentType(HttpHeaders.java:849)〜[spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] org.springframework.boot.autoconfigure.web org.springframework.boot.autoconfigure.web.reactive中的.reactive.error.AbstractErrorWebExceptionHandler.write(AbstractErrorWebExceptionHandler.java:235)〜[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6] . error.AbstractErrorWebExceptionHandler.lambda $ handle $ 1(AbstractErrorWebExceptionHandler.java:228)〜[spring-boot-autoconfigure-2.0.0.M6.jar:2.0.0.M6] at reactor.core.publisher.MonoFlatMap $ FlatMapMain.onNext( MonoFlatMap.java:118)[reactor-core-3.1.1.RELEASE.jar: 3.1.1.RELEASE]在reactor.core上的reactor.core.publisher.Operators $ MonoSubscriber.complete(Operators.java:1092)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] . publisher.MonoFlatMap $ FlatMapInner.onNext(MonoFlatMap.java:241)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxPeekFuseable $ PeekFuseableSubscriber.onNext(FluxPeekFuseable.java :198)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Operators $ ScalarSubscription.request(Operators.java:1649)~ [reactor-core-3.1 . 1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxPeekFuseable $ PeekFuseableSubscriber.request(FluxPeekFuseable.java:138)reactor.core.publisher.MonoFlatMap上的〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] $ FlatMapInner.onSubscribe(MonoFlatMap.java:230)〜[reactor-core-3.1.1.RELEASE .jar:3.1.1.RELEASE] at reactor.core.publisher.FluxPeekFuseable $ PeekFuseableSubscriber.onSubscribe(FluxPeekFuseable.java:172)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor .core.publisher.MonoJust.subscribe(MonoJust.java:54)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoPeekFuseable.subscribe(MonoPeekFuseable.java: 74)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoFlatMap $ FlatMapMain.onNext(MonoFlatMap.java:150)[reactor-core-3.1.1 . RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty $ SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at at reactor.core.publisher.FluxSwitchIfEmpty $ SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at at reactor.core.publisher.Operators $ ScalarSubscription.request(Operators.java:1649)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Operators $ MultiSubscriptionSubscriber.set (Operators.java:1463)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Operators $ MultiSubscriptionSubscriber.onSubscribe(Operators.java:1337)~ [reactor- core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1 .RELEASE] at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Mono.subscribe (Mono.java:2913)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxSwitchIfEmpty $ SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:75)〜[reactor- core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Operators.complete(Operators.java:125)~ [reactor-core-3.1.1.RELEASE.jar:3.1.1 .R ELEASE]于reactor.core.publisher.MonoEmpty.subscribe(MonoEmpty.java:45)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoSwitchIfEmpty.subscribe(在reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)〜[reactor-core-3.1]的MonoSwitchIfEmpty.java:44)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] .1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE]在reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono . java:2913)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxOnErrorResume $ ResumeSubscriber.onError(FluxOnErrorResume.java:97)〜[reactor-core-3.1 reactor.1ore.publisher.MonoFlatMap上的.1.RELEASE.jar:3.1.1.RELEASE] $ FlatMapMain.secondError(MonoFlatMap.java:185)[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE在达到tor.core.publisher.MonoFlatMap $ FlatMapInner.onError(MonoFlatMap.java:251)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxOnErrorResume $ ResumeSubscriber.onError (FluxOnErrorResume.java:100)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Operators.error(Operators.java:175)〜[reactor-core- 3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxFlatMap.trySubscribeScalarMap(FluxFlatMap.java:129)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE ] reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:53)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.Mono.subscribe(Mono) .java:2913)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxOnErrorResume $ ResumeSubscriber.onError(FluxOnErrorResume.java:97)〜[reactor-core- 3.1.1.RELEASE.jar:3.1.1.RELEASE] at org.springframework.http.server.reactive.ChannelSendOperator $ WriteCompletionBarrier.onError(ChannelSendOperator.java:339)~ [在reactor.core.publisher.MonoNext $ NextSubscriber.onError(MonoNext.java:87)〜[reactor-core-3.1.1.RELEASE.jar]的spring-web-5.0.1.RELEASE.jar:5.0.1.RELEASE] :reactor.core.publisher.Operators的$ 3.1.1.RELEASE]在reactor.core上的$ MultiSubscriptionSubscriber.onError(Operators.java:1332)~ [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] .publisher.Operators $ MonoSubscriber.onError(Operators.java:1135)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] atreactor.core.publisher.MonoIgnoreThen $ ThenAcceptInner.onError(MonoIgnoreThen.java:300)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.MonoIgnoreElements $ IgnoreElementsSubscriber.onError (MonoIgnoreElements.java:75)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxConcatMap $ ConcatMapDelayed.drain(FluxConcatMap.java:660)~ [reactor- reactor-3.1.1.RELEASE.jar:3.1.1.RELEASE] at reactor.core.publisher.FluxConcatMap $ ConcatMapDelayed.onNext(FluxConcatMap.java:581)~ [reactor-core-3.1.1.RELEASE.jar:3.1 .1.RELEASE]在reactor.core.publisher.FluxMap $ MapSubscriber.onNext(FluxMap.java:108)〜[reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] org.springframework.http reactor.reactive.ChanlisherFuseable $ MapFuseableSubscriber.onNext reactor.cor中的FluxMapFuseable.java:115)~ [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] e.publisher.FluxGenerate $ GenerateSubscription.next(FluxGenerate.java:164)~ [reactor-core-3.1.1.RELEASE.jar:3.1.1.RELEASE] at io.github.msayag.webflux.MyController.lambda $ getStreamOfLongs $ 0(MyController.java:44)〜[classes /:na] ... at io.netty.util.concurrent.SingleThreadEventExecutor $ 5.run(SingleThreadEventExecutor.java:858)~ [netty-common-4.1.16.Final . jar:4.1.16.Final] at java.lang.Thread.run(Thread.java:748)~ [na:1.8.0_131]

1 回答

  • 0

    据我所知,你这样做是正确的 .

    Disposable::dispose 有效地取消了流,当你不再对接收数据感兴趣时,你应该从 Subscriber 的角度来做 .

    WebClient 侧调用它将导致关闭HTTP连接 . 我不知道't think there'告诉服务器你不想再接收数据了 . 使用HTTP / 2,事情可能会有所不同,因为可以在不关闭整个连接的情况下关闭HTTP流 .

    从服务器的角度来看,客户端自动取消看起来与因错误而关闭连接的客户端相同 . 所以例外情况都表明了这一点

    • 连接在服务器尝试写入时关闭

    • 响应未正确处理(服务器仍有待写的东西)

    如果您对此行为有改进意见,请在https://jira.spring.io上创建一张票

相关问题