我有一个看起来像这样的集成流程
一个) . 轮询器在排队的通道中丢弃消息
B) . 服务激活器从队列中获取消息,并且我已在轮询器上为此服务激活器指定了一个执行程序
C) . 服务激活器将处理后的消息发送到发布订户信道
d) . 然后,发布订阅 Channels 具有多个消费者,这些消费者接收该消息以供进一步处理
好吧所以我想知道执行者在这个流程中扮演什么角色
a)所以从队列中轮询的服务激活器有一个 Actuator ,可以说有一个10个线程的固定池,并且假设我有每个轮询的最大消息为5所以我假设有5个新消息将在一个处理从游泳池前往5个独立的线程 .
b)假设这是正确的,那么当5种不同的消息到达发布 - 订户信道时会发生什么 . 让我们说这个pub-sub Channels 有3个订阅者..这是否意味着内部将产生3个新线程以异步方式将传入消息传递给3个不同的订阅者..这样订阅者/消费者可以并行处理消息 . 我想在这一点上,对于如何处理事情会有点模糊 . 基本上我想我想知道是否消费者被提供消息和消费者链并行运行..如果那就是那种情况下执行者
任何评论将不胜感激谢谢
基于以下输入,我正在尝试这个
<int:service-activator
input-channel="filesIn"
output-channel="readyFiles"
ref="handler">
<int:poller fixed-delay="3000" max-messages-per-poll="3" />
</int:service-activator>
<int:publish-subscribe-channel id="readyFiles" task-executor="executor">
</int:publish-subscribe-channel>
<int:service-activator
id="consumer1"
input-channel="readyFiles"
ref="handler"
/>
<int:service-activator
id="consumer2"
input-channel="readyFiles"
ref="handler"
/>
<int:service-activator
id="consumer3"
input-channel="readyFiles"
ref="handler"
/>
<task:executor id="executor" pool-size="10" rejection-policy="CALLER_RUNS"/>
我的处理程序简单地减慢了一些事情
public void handle(File file) throws InterruptedException{
log.debug(Thread.currentThread().getName() + " executing for file " + file.getName() + " ...");
Thread.sleep(3000);
log.debug(Thread.currentThread().getName() + " completed for file " + file.getName());
}
因为我在pub-sub通道上使用了一个任务 Actuator ,我希望所有3个消耗应该一起启动,而不是结果是顺序调用
产量
18:44:11.000 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 1.txt ...
18:44:14.001 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 1.txt
18:44:14.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 2.txt ...
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 2.txt
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 3.txt ...
18:44:20.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 3.txt
在前一个订户完成之后调用每个订户 .
1 回答
<publish-subscribe-channel>
具有task-executor
选项以在Executor
的线程内启动处理程序调用 . 这意味着您的订阅者可以并行处理该消息 . 但是,当然,没有保证,因为它取决于Executor
性质和国家 .我认为源代码(
BroadcastingDispatcher
)中的这个快照可以帮助您: