首页 文章

发布订阅者 Channels - 消费者并行运行

提问于
浏览
1

我有一个看起来像这样的集成流程

一个) . 轮询器在排队的通道中丢弃消息

B) . 服务激活器从队列中获取消息,并且我已在轮询器上为此服务激活器指定了一个执行程序

C) . 服务激活器将处理后的消息发送到发布订户信道

d) . 然后,发布订阅 Channels 具有多个消费者,这些消费者接收该消息以供进一步处理

好吧所以我想知道执行者在这个流程中扮演什么角色

a)所以从队列中轮询的服务激活器有一个 Actuator ,可以说有一个10个线程的固定池,并且假设我有每个轮询的最大消息为5所以我假设有5个新消息将在一个处理从游泳池前往5个独立的线程 .

b)假设这是正确的,那么当5种不同的消息到达发布 - 订户信道时会发生什么 . 让我们说这个pub-sub Channels 有3个订阅者..这是否意味着内部将产生3个新线程以异步方式将传入消息传递给3个不同的订阅者..这样订阅者/消费者可以并行处理消息 . 我想在这一点上,对于如何处理事情会有点模糊 . 基本上我想我想知道是否消费者被提供消息和消费者链并行运行..如果那就是那种情况下执行者

任何评论将不胜感激谢谢

基于以下输入,我正在尝试这个

<int:service-activator 
                    input-channel="filesIn"
                    output-channel="readyFiles"
                    ref="handler">
        <int:poller fixed-delay="3000" max-messages-per-poll="3" />
    </int:service-activator>

    <int:publish-subscribe-channel id="readyFiles" task-executor="executor">            
    </int:publish-subscribe-channel>

    <int:service-activator 
                            id="consumer1"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <int:service-activator 
                            id="consumer2"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <int:service-activator 
                            id="consumer3"
                            input-channel="readyFiles"
                            ref="handler"
                            />

    <task:executor id="executor" pool-size="10" rejection-policy="CALLER_RUNS"/>

我的处理程序简单地减慢了一些事情

public void handle(File file) throws InterruptedException{
    log.debug(Thread.currentThread().getName() + " executing for file " + file.getName() + " ...");
    Thread.sleep(3000);
    log.debug(Thread.currentThread().getName() + " completed for file " + file.getName());
}

因为我在pub-sub通道上使用了一个任务 Actuator ,我希望所有3个消耗应该一起启动,而不是结果是顺序调用

产量

18:44:11.000 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 1.txt ...
18:44:14.001 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 1.txt
18:44:14.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 2.txt ...
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 2.txt
18:44:17.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 executing for file 3.txt ...
18:44:20.002 DEBUG [task-scheduler-3][com.test.Handler] task-scheduler-3 completed for file 3.txt

在前一个订户完成之后调用每个订户 .

1 回答

  • 0

    <publish-subscribe-channel> 具有 task-executor 选项以在 Executor 的线程内启动处理程序调用 . 这意味着您的订阅者可以并行处理该消息 . 但是,当然,没有保证,因为它取决于 Executor 性质和国家 .

    我认为源代码( BroadcastingDispatcher )中的这个快照可以帮助您:

    for (final MessageHandler handler : handlers) {
        if (this.executor != null) {
            this.executor.execute(new Runnable() {
                @Override
                public void run() {
                    invokeHandler(handler, messageToSend);
                }
            });
            dispatched++;
        }
        else {
            if (this.invokeHandler(handler, messageToSend)) {
                dispatched++;
            }
        }
    }
    

相关问题