首页 文章

Spring Boot Webflux / Netty - 检测关闭连接

提问于
浏览
5

我一直在使用webflux启动器( spring-boot-starter-webflux )使用spring-boot 2.0.0.RC1 . 我创建了一个简单的控制器,返回无限的通量 . 我希望发布者只有在有客户端(订阅者)时才能正常工作 . 假设我有一个像这样的控制器:

@RestController
public class Demo {

    @GetMapping(value = "/")
    public Flux<String> getEvents(){
        return Flux.create((FluxSink<String> sink) -> {

            while(!sink.isCancelled()){

                // TODO e.g. fetch data from somewhere

                sink.next("DATA");
            }
            sink.complete();
        }).doFinally(signal -> System.out.println("END"));
    }

}

现在,当我尝试运行该代码并使用Chrome访问 endpoints http://localhost:8080/时,我可以看到数据 . 但是,一旦我关闭浏览器,while循环就会继续,因为没有触发取消事件 . How can I terminate/cancel the streaming as soon as I close the browser?

从这个answer我引用:

目前使用HTTP,确切的背压信息不会通过网络传输,因为HTTP协议不支持此功能 . 如果我们使用不同的线路协议,这可能会改变 .

我认为,由于HTTP协议不支持背压,这意味着也不会发出取消请求 .

通过分析网络流量进一步调查,显示浏览器在关闭浏览器后立即发送TCP FIN . 有没有办法配置Netty(或其他),以便半封闭连接将触发发布者的取消事件,使while循环停止?

或者我是否必须编写自己的适配器,类似于 org.springframework.http.server.reactive.ServletHttpHandlerAdapter ,我实现自己的订阅服务器?

谢谢你的帮助 .

EDIT: 如果没有客户端,则会在尝试将数据写入套接字时引发 IOException . 正如您在stack trace中看到的那样 .

但这还不够好,因为在下一个数据块准备好发送之前可能需要一段时间,因此需要花费相同的时间来检测已经消失的客户端 . 正如Brian Clozel's answer中指出的那样,这是Reactor Netty中的一个已知问题 . 我试图通过将依赖项添加到 POM.xml 来使用Tomcat . 像这样:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-tomcat</artifactId>
</dependency>

虽然它取代了Netty并使用了Tomcat,但由于浏览器没有显示任何数据,它似乎没有被动反应 . 但是,控制台中没有警告/信息/异常 . Is spring-boot-starter-webflux as of this version (2.0.0.RC1) supposed to work together with Tomcat?

2 回答

  • 0

    由于这是一个已知问题(请参阅Brian Clozel's answer),我最终使用一个 Flux 来获取我的真实数据并使用另一个来实现某种ping /心跳机制 . 结果,我将两者合并在一起 Flux.merge() .

    在这里,您可以看到我的解决方案的简化版本:

    @RestController
    public class Demo {
    
        public interface Notification{}
    
        public static class MyData implements Notification{
            …
            public boolean isEmpty(){…}
        }
    
        @GetMapping(value = "/", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
        public Flux<ServerSentEvent<? extends Notification>> getNotificationStream() {
            return Flux.merge(getEventMessageStream(), getHeartbeatStream());
        }
    
        private Flux<ServerSentEvent<Notification>> getHeartbeatStream() {
            return Flux.interval(Duration.ofSeconds(2))
                    .map(i -> ServerSentEvent.<Notification>builder().event("ping").build())
                    .doFinally(signalType ->System.out.println("END"));
        }
    
        private Flux<ServerSentEvent<MyData>> getEventMessageStream() {
            return Flux.interval(Duration.ofSeconds(30))
                    .map(i -> {
    
                        // TODO e.g. fetch data from somewhere,
                        // if there is no data return an empty object
    
                        return data;
                    })
                    .filter(data -> !data.isEmpty())
                    .map(data -> ServerSentEvent
                            .builder(data)
                            .event("message").build());
        }
    }
    

    我将所有内容都包装起来 ServerSentEvent<? extends Notification> . Notification 只是一个标记界面 . 我使用 ServerSentEvent 类中的 event 字段来分隔数据和ping事件 . 由于心跳 Flux 不断地以短间隔发送事件,因此检测客户端消失所花费的时间最多是该间隔的长度 . 请记住,我需要这样做,因为在我得到一些可以发送的真实数据之前可能需要一段时间,因此,在检测到客户端消失之前可能还需要一段时间 . 像这样,它会在无法发送ping(或可能是消息事件)时检测到客户端已经消失 .

    标记界面的最后一个注释,我称之为通知 . 这不是必需的,但它提供了一些类型的安全性 . 没有它,我们可以编写 Flux<ServerSentEvent<?>> 而不是 Flux<ServerSentEvent<? extends Notification>> 作为getNotificationStream()方法的返回类型 . 或者也可以,让getHeartbeatStream()返回 Flux<ServerSentEvent<MyData>> . 但是,像这样它可以允许任何对象被发送,这是我不想要的 . 结果,我添加了界面 .

  • 4

    我不确定为什么会这样,但我怀疑这是因为选择了生成算子 . 我认为使用以下方法可行:

    return Flux.interval(Duration.ofMillis(500))
        .map(input -> {
            return "DATA";
        });
    

    根据to Reactor's reference documentation, you're probably hitting the key difference between generate and push(我相信使用generate的非常类似的方法可能也会起作用) .

    我的评论是指背压信息( Subscriber 愿意接受多少元素),但成功/错误信息是通过网络传达的 .

    根据您选择的Web服务器(Reactor Netty,Tomcat,Jetty等),关闭客户端连接可能会导致:

    • 在服务器端收到取消信号(我认为这是由Netty支持的)

    • 服务器在关闭_357478时收到错误信号(我相信Servlet规范没有提供该回调,我们错过了取消信息) .

    简而言之:您不需要做任何特殊的事情,它应该已经得到支持,但是您的 Flux 实现可能是这里的实际问题 .

    Update: this is a known issue in Reactor Netty

相关问题