首页 文章

spring 反应器螺纹模型

提问于
浏览
0

Spring Webflux的新手警报(v 2.0.1.RELEASE) .

我想将Spring Webflux用于后端(Webless)应用程序,以便从JMS侦听器处理大量数据 .

我的理解是Spring Webflux提供了一种非阻塞/异步并发模型 . 但是,我得到了一个基本问题,我需要一些帮助 . 作为一个免责声明,这个反应式编程的整个概念对我来说是一个新的东西,我仍然处于这种范式转换的过程中 .

考虑以下代码:

Mono.just("ONE")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

Mono.just("TWO")
.map(item -> func(" A " + item))
.map(item -> func(" B " + item))
.map(item -> func(" C " + item))
.subscribe(System.out::println);

我从文档中了解到,在调用“subscribe”函数之前,事件处理链没有任何反应 .

但是在内部,spring使用(如果愿意)对“map”函数中的每个函数异步使用单独的线程?如果spring为这些链使用“单个”线程,那么这里的真正目的是什么?它不是基于不同语法的阻塞和单线程模型吗?

我观察到代码总是按顺序运行并且具有相同的线程 . 什么是spring webflux的线程模型?

2 回答

  • 2

    反应式编程是一种编程范例,因此它不会对技术实现做出任何假设 .

    The reactive manifesto描述了反应系统,并在表上引入了异步通信和背压 . 除此之外,它也没有对技术细节做出任何假设 .

    Spring Reactor是Webflux的基础,它是一个库,允许您轻松构建反应系统并遵循反应式编程范例 .

    流使用的线程取决于发布者 . 默认是使用当前线程 . 如果发布者是同步的,则在没有任何干预的情况下,流不能是异步的 . 如果发布者阻止,则流阻塞 . 但是请看以下示例:

    Flux.interval(Duration.ofMillis(100))
        .take(2)
        .subscribe(i -> System.out.println(Thread.currentThread().getName()));
    

    Flux.interval 在另一个线程上发布,因此链在另一个线程中异步运行 .

    让我们看另一个例子:

    Scheduler scheduler = Schedulers.newElastic("foo");
    
    Flux<Integer> flux = Flux.just(1, 2)
        .subscribeOn(scheduler);
    
    flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
    flux.subscribe(i -> System.out.println(Thread.currentThread().getName()));
    

    您会注意到每个订阅者都在自己的线程上运行(尽管来自同一个线程池) . publishOn 运算符类似 .

    如果您订阅了发布者,则可以使用相同的编程范例,无论它是同步还是异步 . 并且您始终可以通过添加 subscribeOnpublishOn 运算符来引入异步行为 .

  • 0

    TL; DR:

    • 它's a Project Reactor thing, Spring-Webflux doesn' t决定在哪个线程上运行什么操作 .

    • Project Reactor可以更容易地告诉您're crossing the thread boundaries. Also, there'没有(显式)同步正在进行中;使并发问题更难引入 .

    不,它不是具有不同语法的单线程模型 . Project Reactor尽可能尝试使用主线程来避免上下文切换 . 此外,它还提供了特殊的运算符,允许您指定先前运行的线程 .

    例如,这个修改过的例子将在不同的线程上运行; as subscribeOn 运算符定义整个链运行的线程池:

    Mono.just("ONE")
        .map(item -> func(" A " + item))
        .map(item -> func(" B " + item))
        .map(item -> func(" C " + item))
        .subscribeOn(Schedulers.elastic())
        .subscribe(item -> {
            System.out.println(Thread.currentThread().getName() + " " + item);
        });
    
    Mono.just("TWO")
        .map(item -> func(" A " + item))
        .map(item -> func(" B " + item))
        .map(item -> func(" C " + item))
        .subscribeOn(Schedulers.elastic())
        .subscribe(item -> {
            System.out.println(Thread.currentThread().getName() + " " + item);
        });
    

    在这种情况下,两个操作都在 elastic-x 线程上执行;没有阻塞主线程 . 每次运行时,操作的顺序可能会有所不同 .

相关问题