首页 文章

Spring Integration多个不同时处理的消费者

提问于
浏览
0

我正在使用Spring与ActiveMQ集成 . 我使用maxConcurrentConsumers = 5定义了一个DefaultMessageListenerContainer . 它在a中引用 . 在int-xml:validating-filter和int-xml:unmarshalling-transformer之后,我定义了一个队列通道actionInstructionTransformed . 我有一个这个队列通道的轮询器 . 当我启动我的应用程序时,在ActiveMQ控制台中,我可以看到在五个会话中创建了一个连接 .

现在,我有一个带有注释方法的 @MessageEndpoint

@ServiceActivator(inputChannel = "actionInstructionTransformed", poller = @Poller(value = "customPoller")).

我在方法入口处有一个日志声明 . 每条消息的处理时间很长(几分钟) . 在我的日志中,我可以看到 thread-1 开始处理,然后我只能看到 thread-1 输出 . 只有当 thread-1 处理完1条消息时,我才会看到 thread-2 开始处理下一条消息,等等 . 我的类中没有任何同步块注释 @MessageEndpoint . 我没有设法同时获得 thread-1thread-2 等处理消息 .

有没有人经历过类似的事情?

1 回答

  • 0

    你看,你说:

    在int-xml:validating-filter和int-xml:unmarshalling-transformer之后,我定义了一个队列通道actionInstructionTransformed .

    现在让我们去 QueueChannelPollingConsumer definitions

    另一方面,连接到实现org.springframework.messaging.PollableChannel接口(例如QueueChannel)的通道的通道适配器将生成PollingConsumer的实例 .

    并注意 @PollerPollerMetadata )有 taskExecutor 选项 .

    默认情况下, TaskScedhuler 根据 trigger 配置定期询问 QueueChannel 数据 . 如果这是 PeriodicTrigger ,使用默认选项,如 fixedRate = false ,下一个轮询确实发生在前一个轮询之后 . 这就是为什么你只看到一个线程 .

    因此,尝试配置 taskExecutor ,来自该队列的消息将并行执行 .

    DefaultMessageListenerContainer 上的 concurrency 无效 . 因为最后你将所有这些消息放到 QueueChannel . 在这里,一个新的线程模型开始基于 @Poller 配置工作 .

相关问题