我正在使用webflux进行一个小概念验证 . 在我的应用程序的一部分中,我想与数据库(通过JDBC)进行通信,该数据库阻塞并且不适合反应堆 . 然而,对于这个概念验证,我正在考虑以下技巧:
-
将专用线程池(让我们称之为
DBThreadPool
)定义为ExecutorService
,其中固定数量的线程等于JDBC连接池大小 . 在Reactor Scheduler中包装该池(名为dbScheduller
) -
换行阻塞调用(如Reactor doc中所述)并在
dbScheduller
上安排它:
public Flux<DbUser> allUsers() {
return Mono.fromCallable(() -> <jdbcQueryHere>)
.flatMapIterable(Function.identity())
.log("DB-OPER").subscribeOn(dbScheduller);
}
- 一旦查询完成,我想处理返回的
Flux<DbUser>
与一些被动运算符但是在 container thread 中执行它们所以我可以释放数据库线程 . 根据Reactor doc,它可以通过publishOn
方法完成:
public Mono<ServerResponse> allUsers(ServerRequest request) {
return ServerResponse.ok()
.contentType(APPLICATION_STREAM_JSON)
.body(
usersDao.allUsers()
.publishOn(<netty Thread pool as scheduller>)
.map(<some function>)
.filter(<some predicate>),
DbUser.class);
}
你认为这是一种可行的方法(直到我们得到数据库的非阻塞驱动程序)?
如何访问容器(netty)线程池,以便其中一个线程可用于最终确定(从DB处理后数据并写入响应)HTTP请求?
我知道我可以单独使用数据库线程(通过省略 publishOn
)来完成HTTP请求,但我想尽快发布它(因此它可以被另一个也需要访问数据库的请求重用)并留下休息对netty托管线程的工作(这可能是耗时的)(它可能是最初执行我的 handler method
的线程) .
1 回答
我个人认为这是一种有效的方法 . BUT 许多其他人(他们中的大多数人比我更了解反应性编程)怀疑这是一个好主意 .
你的选择:
Pro:清除分离阻塞和非阻塞代码 .
Con:包括网络在内的另一个服务边界 .
亲:没有服务间沟通 .
Con:降低实际响应的应用程序百分比 .
如果您只是渲染响应,则不需要,因为这会自动发生在netty线程上 .
如果你有实际的转换,那么就没有任何现成的做法 . 我想可以使用一些Netty API并基于此实现
Scheduler
. 但我对Netty的帮助不够了解 .为了回答后续问题,我有点问自己:
publishOn
方法并非真正用于此目的 . 它们的用例更多地用于例如Swing应用程序,在管道的末尾,您需要使用某个线程或线程池 .我个人的感觉是Reactor应该有这样的东西,但开发人员目前不相信这是一个好主意 . 所以我建议你认为它对你的申请真的有意义:open a ticket并准备好争论你的观点 . 要么你成功,要么我们都知道为什么这是一个坏主意:-)
很高兴你问你是否在一个反应性管道中的线程池上阻塞东西你应该考虑如何在线程池(或相应的资源)被重载时处理场景,即它处理的事件比它们生成的要慢 .