首页 文章

Spring集成

提问于
浏览
6

我们的应用程序是使用Spring Integration Framework设计的 . 完成消息动作流程开始于监听已使用JMS消息驱动适配器的Queue,之后已定义基于通道的队列 endpoints ,并且每个 endpoints 由Service-Activators处理 .

我们目前正处于性能阶段,我们正在产生200条消息请求 . 最初我们观察到消息没有并行执行,在做了一些阅读后发现通过将并发消费者和最大并发消费者属性添加到JMS消息驱动的侦听器适配器将有助于启用多线程模式 . 确实这有帮助,但仍然介于这个过程之间,我仍然看到单线程效应 . 这是否由于 endpoints 已定义?向每个 endpoints 添加队列容量有什么好处?您认为通过向每个Channel endpoints 定义添加队列容量将再次有助于在多线程模式下运行 .

请求的设计快照:

action flow

3 回答

  • 0

    查看 Channels 的确切定义会很有帮助 .

    默认情况下,Spring通道在发送方's thread. In other words, it'同步中使用其消息 . 如果您希望通道异步使用消息,则必须指定TaskExecutor . 见http://static.springsource.org/spring/docs/3.0.5.RELEASE/reference/scheduling.html

  • 0

    查看您的流程图,看起来流程中有许多单线程元素,并且可以进行优化以实现更高的并发性,希望更高的吞吐量 .

    要从消息驱动的通道适配器(您尚未显示配置)开始,可以配置 to have more than 1 default consumer ,并且可以设置为 consume a reasonable number of message per consume cycle .

    通过消息驱动的通道适配器,将消息放入直接通道1的线程将遗憾地运行剩余的流程,因为其他地方没有缓冲,因此当您的消息被放入"Direct Channel 1"时,它将立即调用路由器然后在同一个线程中调用服务激活器和同一线程中的Mail适配器或JMS Outbound通道适配器 . 这里的更改可能是 introduce a queue channel instead of direct channel 1 ,这样消耗消息的线程只是将消息放入队列通道然后用它完成 .

    超越直接通道1(更改为队列通道1),我认为可以根据流量的快或慢来进行单线程, if say the mail adapter is slow then Direct Channel 4 can be made a queue channel also ,与 Direct Channel 5 相同

    您能否看看这些以粗体突出显示的更改是否有助于改善流程

  • 0

    为了提高性能,我建议使用带有任务 Actuator 的executorchannel来控制线程池大小的数量 . 通过这种方式,您可以获得消息到达jms队列时消费者接收消息并在单独的线程中处理流的情况 . 请记住,在这种配置中,多线程工作由taskexecutor通道执行taht将在单独的线程中执行消息的重新生成,因此您已经想到了您想要的多线程等级 .

    对于队列消息通道,确实需要轮询器在通道上轮询以执行recive,队列容量是幕后原子队列的容量 .

    您可以在xml中以这种方式配置执行程序通道

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:si="http://www.springframework.org/schema/integration"
           xmlns:tx="http://www.springframework.org/schema/task"
           xsi:schemaLocation="http://www.springframework.org/schema/beans 
           http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/integration 
            http://www.springframework.org/schema/integration/spring-integration.xsd 
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
           <tx:executor id="taskExecutor" pool-size="10" queue-capacity="10"/>
           <si:channel id="ch" >
                  <si:dispatcher task-executor="taskExecutor"/>       
           </si:channel>
    </beans>
    

    或者以这种方式在java-dsl中

    @Bean
        public IntegrationFlow storeBookPageByPage(ConnectionFactory connectionFactory,
                                                   @Qualifier("createBookQueue") Destination createBookQueue,
                                                   @Qualifier("createBookResultQueue") Destination createBookResultQueue,
                                                   PdfBookMasterRepository pdfBookMasterRepository,
                                                   BookRepository bookRepository){
            String tempFilePathBaseDir = environment.getProperty("bookService.storeBookPageByPage.tempFilePathBaseDir");
    
            return IntegrationFlows.from(Jms.messageDriverChannelAdapter(connectionFactory)
                    .destination(createBookQueue)
                    .errorChannel(storeBookPageByPageErrorChannel()))
                    .channel(channels -> channels.executor(Executors.newScheduledThreadPool(5)))
                    .....
    
            }
    

相关问题