我一直在使用webflux启动器( spring-boot-starter-webflux
)使用spring-boot 2.0.0.RC1
. 我创建了一个简单的控制器,返回无限的通量 . 我希望发布者只有在有客户端(订阅者)时才能正常工作 . 假设我有一个像这样的控制器:
@RestController
public class Demo {
@GetMapping(value = "/")
public Flux<String> getEvents(){
return Flux.create((FluxSink<String> sink) -> {
while(!sink.isCancelled()){
// TODO e.g. fetch data from somewhere
sink.next("DATA");
}
sink.complete();
}).doFinally(signal -> System.out.println("END"));
}
}
现在,当我尝试运行该代码并使用Chrome访问 endpoints http://localhost:8080/时,我可以看到数据 . 但是,一旦我关闭浏览器,while循环就会继续,因为没有触发取消事件 . How can I terminate/cancel the streaming as soon as I close the browser?
从这个answer我引用:
目前使用HTTP,确切的背压信息不会通过网络传输,因为HTTP协议不支持此功能 . 如果我们使用不同的线路协议,这可能会改变 .
我认为,由于HTTP协议不支持背压,这意味着也不会发出取消请求 .
通过分析网络流量进一步调查,显示浏览器在关闭浏览器后立即发送TCP FIN
. 有没有办法配置Netty(或其他),以便半封闭连接将触发发布者的取消事件,使while循环停止?
或者我是否必须编写自己的适配器,类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter
,我实现自己的订阅服务器?
谢谢你的帮助 .
EDIT: 如果没有客户端,则会在尝试将数据写入套接字时引发 IOException
. 正如您在stack trace中看到的那样 .
但这还不够好,因为在下一个数据块准备好发送之前可能需要一段时间,因此需要花费相同的时间来检测已经消失的客户端 . 正如Brian Clozel's answer中指出的那样,这是Reactor Netty中的一个已知问题 . 我试图通过将依赖项添加到 POM.xml
来使用Tomcat . 像这样:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>
虽然它取代了Netty并使用了Tomcat,但由于浏览器没有显示任何数据,它似乎没有被动反应 . 但是,控制台中没有警告/信息/异常 . Is spring-boot-starter-webflux as of this version (2.0.0.RC1) supposed to work together with Tomcat?
2 回答
由于这是一个已知问题(请参阅Brian Clozel's answer),我最终使用一个
Flux
来获取我的真实数据并使用另一个来实现某种ping /心跳机制 . 结果,我将两者合并在一起Flux.merge()
.在这里,您可以看到我的解决方案的简化版本:
我将所有内容都包装起来
ServerSentEvent<? extends Notification>
.Notification
只是一个标记界面 . 我使用ServerSentEvent
类中的event
字段来分隔数据和ping事件 . 由于心跳Flux
不断地以短间隔发送事件,因此检测客户端消失所花费的时间最多是该间隔的长度 . 请记住,我需要这样做,因为在我得到一些可以发送的真实数据之前可能需要一段时间,因此,在检测到客户端消失之前可能还需要一段时间 . 像这样,它会在无法发送ping(或可能是消息事件)时检测到客户端已经消失 .标记界面的最后一个注释,我称之为通知 . 这不是必需的,但它提供了一些类型的安全性 . 没有它,我们可以编写
Flux<ServerSentEvent<?>>
而不是Flux<ServerSentEvent<? extends Notification>>
作为getNotificationStream()方法的返回类型 . 或者也可以,让getHeartbeatStream()返回Flux<ServerSentEvent<MyData>>
. 但是,像这样它可以允许任何对象被发送,这是我不想要的 . 结果,我添加了界面 .我不确定为什么会这样,但我怀疑这是因为选择了生成算子 . 我认为使用以下方法可行:
根据to Reactor's reference documentation, you're probably hitting the key difference between generate and push(我相信使用generate的非常类似的方法可能也会起作用) .
我的评论是指背压信息(
Subscriber
愿意接受多少元素),但成功/错误信息是通过网络传达的 .根据您选择的Web服务器(Reactor Netty,Tomcat,Jetty等),关闭客户端连接可能会导致:
在服务器端收到取消信号(我认为这是由Netty支持的)
服务器在关闭_357478时收到错误信号(我相信Servlet规范没有提供该回调,我们错过了取消信息) .
简而言之:您不需要做任何特殊的事情,它应该已经得到支持,但是您的
Flux
实现可能是这里的实际问题 .Update: this is a known issue in Reactor Netty